From 109e1d8ad24de24f90c0c629490b8a7f648e741d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 2 Aug 2023 16:01:35 +0300 Subject: [PATCH] Get rid of `Peerset` compatibility layer (#14337) * Move bootnodes from individual `SetConfig`s to `PeersetConfig` * Move `SetId` & `SetConfig` from `peerset` to `protocol_controller` * Remove unused `DropReason` * Move `Message` & `IncomingIndex` from `peerset` to `protocol_controller` * Restore running fuzz test * Get rid of `Peerset` in `fuzz` test * Spawn runners instead of manual polling in `fuzz` test * Migrate `Protocol` from `Peerset` to `PeerStore` & `ProtocolController` * Migrate `NetworkService` from `Peerset` to `PeerStore` & `ProtocolController` * Migrate `Notifications` from `Peerset` to `ProtocolController`s * Migrate `Notifications` tests from `Peerset` to `ProtocolController` * Fix compilation of `NetworkService` & `Protocol` * Fix borrowing issues in `Notifications` * Migrate `RequestResponse`from `Peerset` to `PeerStore` * rustfmt * Migrate request-response tests from `Peerset` to `PeerStore` * Migrate `reconnect_after_disconnect` test to `PeerStore` & `ProtocolController` * Fix `Notifications` tests * Remove `Peerset` completely * Fix bug with counting sync peers in `Protocol` * Eliminate indirect calls to `PeerStore` via `Protocol` * Eliminate indirect calls to `ProtocolController` via `Protocol` * Handle `Err` outcome from `remove_peers_from_reserved_set` * Add note about disconnecting sync peers in `Protocol` * minor: remove unneeded `clone()` * minor: extra comma removed * minor: use `Stream` API of `from_protocol_controllers` channel * minor: remove TODO * minor: replace `.map().flatten()` with `.flat_map()` * minor: update `ProtocolController` docs * rustfmt * Apply suggestions from code review Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com> * Extract `MockPeerStore` to `mock.rs` * Move `PeerStore` initialization to `build_network` * minor: remove unused import * minor: clarify error message * Convert `syncs_header_only_forks` test into single-threaded --------- Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com> --- .../grandpa/src/communication/tests.rs | 8 +- client/network-gossip/src/bridge.rs | 13 +- client/network-gossip/src/lib.rs | 6 + client/network-gossip/src/state_machine.rs | 8 +- client/network/src/behaviour.rs | 6 +- client/network/src/config.rs | 9 + client/network/src/lib.rs | 8 +- client/network/src/mock.rs | 55 +++ client/network/src/peer_store.rs | 6 + client/network/src/peerset.rs | 394 ----------------- client/network/src/protocol.rs | 192 ++------- .../src/protocol/notifications/behaviour.rs | 395 +++++++++--------- .../src/protocol/notifications/tests.rs | 74 ++-- client/network/src/protocol_controller.rs | 182 +++++--- client/network/src/request_responses.rs | 235 +++-------- client/network/src/service.rs | 266 +++++++----- client/network/src/service/traits.rs | 20 +- client/network/statement/src/lib.rs | 5 +- client/network/sync/src/service/mock.rs | 6 +- .../network/test/src/{peerset.rs => fuzz.rs} | 132 +++--- client/network/test/src/lib.rs | 10 + client/network/test/src/service.rs | 8 + client/network/test/src/sync.rs | 5 +- client/network/transactions/src/lib.rs | 5 +- client/offchain/src/api.rs | 6 +- client/offchain/src/lib.rs | 6 +- client/service/src/builder.rs | 14 + 27 files changed, 859 insertions(+), 1215 deletions(-) create mode 100644 client/network/src/mock.rs delete mode 100644 client/network/src/peerset.rs rename client/network/test/src/{peerset.rs => fuzz.rs} (78%) diff --git a/client/consensus/grandpa/src/communication/tests.rs b/client/consensus/grandpa/src/communication/tests.rs index 504fde74be603..10c4772fc76d6 100644 --- a/client/consensus/grandpa/src/communication/tests.rs +++ b/client/consensus/grandpa/src/communication/tests.rs @@ -114,7 +114,13 @@ impl NetworkPeers for TestNetwork { unimplemented!(); } - fn remove_peers_from_reserved_set(&self, _protocol: ProtocolName, _peers: Vec) {} + fn remove_peers_from_reserved_set( + &self, + _protocol: ProtocolName, + _peers: Vec, + ) -> Result<(), String> { + unimplemented!(); + } fn sync_num_connected(&self) -> usize { unimplemented!(); diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index f3ad7983482b2..f14218756ea0e 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -238,10 +238,7 @@ impl Future for GossipEngine { SyncEvent::PeerConnected(remote) => this.network.add_set_reserved(remote, this.protocol.clone()), SyncEvent::PeerDisconnected(remote) => - this.network.remove_peers_from_reserved_set( - this.protocol.clone(), - vec![remote], - ), + this.network.remove_set_reserved(remote, this.protocol.clone()), }, // The sync event stream closed. Do the same for [`GossipValidator`]. Poll::Ready(None) => { @@ -414,7 +411,13 @@ mod tests { unimplemented!(); } - fn remove_peers_from_reserved_set(&self, _protocol: ProtocolName, _peers: Vec) {} + fn remove_peers_from_reserved_set( + &self, + _protocol: ProtocolName, + _peers: Vec, + ) -> Result<(), String> { + unimplemented!(); + } fn sync_num_connected(&self) -> usize { unimplemented!(); diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index d126f85646e6c..4c15995379ed3 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -89,6 +89,12 @@ pub trait Network: NetworkPeers + NetworkEventStream + NetworkNotific log::error!(target: "gossip", "add_set_reserved failed: {}", err); } } + fn remove_set_reserved(&self, who: PeerId, protocol: ProtocolName) { + let result = self.remove_peers_from_reserved_set(protocol, iter::once(who).collect()); + if let Err(err) = result { + log::error!(target: "gossip", "remove_set_reserved failed: {}", err); + } + } } impl Network for T where T: NetworkPeers + NetworkEventStream + NetworkNotification {} diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 3eb7cf6844245..39c0a78e0d350 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -640,7 +640,13 @@ mod tests { unimplemented!(); } - fn remove_peers_from_reserved_set(&self, _protocol: ProtocolName, _peers: Vec) {} + fn remove_peers_from_reserved_set( + &self, + _protocol: ProtocolName, + _peers: Vec, + ) -> Result<(), String> { + unimplemented!(); + } fn sync_num_connected(&self) -> usize { unimplemented!(); diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index e9b55a48e6d8d..005dd146b89aa 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -20,7 +20,7 @@ use crate::{ discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, event::DhtEvent, peer_info, - peerset::PeersetHandle, + peer_store::PeerStoreHandle, protocol::{CustomMessageOutcome, NotificationsSink, Protocol}, request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure}, types::ProtocolName, @@ -173,7 +173,7 @@ impl Behaviour { local_public_key: PublicKey, disco_config: DiscoveryConfig, request_response_protocols: Vec, - peerset: PeersetHandle, + peer_store_handle: PeerStoreHandle, connection_limits: ConnectionLimits, external_addresses: Arc>>, ) -> Result { @@ -188,7 +188,7 @@ impl Behaviour { connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits), request_responses: request_responses::RequestResponsesBehaviour::new( request_response_protocols.into_iter(), - peerset, + Box::new(peer_store_handle), )?, }) } diff --git a/client/network/src/config.rs b/client/network/src/config.rs index e13b6ac0a370d..7964f12527b66 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -32,6 +32,7 @@ pub use crate::{ pub use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; +use crate::peer_store::PeerStoreHandle; use codec::Encode; use prometheus_endpoint::Registry; use zeroize::Zeroize; @@ -270,6 +271,11 @@ impl NonReservedPeerMode { _ => None, } } + + /// If we are in "reserved-only" peer mode. + pub fn is_reserved_only(&self) -> bool { + matches!(self, NonReservedPeerMode::Deny) + } } /// The configuration of a node's secret key, describing the type of key @@ -674,6 +680,9 @@ pub struct Params { /// Network layer configuration. pub network_config: FullNetworkConfiguration, + /// Peer store with known nodes, peer reputations, etc. + pub peer_store: PeerStoreHandle, + /// Legacy name of the protocol to use on the wire. Should be different for each chain. pub protocol_id: ProtocolId, diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 9f528b8bec38c..ee30759687841 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -243,18 +243,20 @@ //! More precise usage details are still being worked on and will likely change in the future. mod behaviour; -mod peer_store; -mod peerset; mod protocol; -mod protocol_controller; mod service; +#[cfg(test)] +mod mock; + pub mod config; pub mod discovery; pub mod error; pub mod event; pub mod network_state; pub mod peer_info; +pub mod peer_store; +pub mod protocol_controller; pub mod request_responses; pub mod transport; pub mod types; diff --git a/client/network/src/mock.rs b/client/network/src/mock.rs new file mode 100644 index 0000000000000..bc596b0fa579e --- /dev/null +++ b/client/network/src/mock.rs @@ -0,0 +1,55 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Mocked components for tests. + +use crate::{peer_store::PeerStoreProvider, protocol_controller::ProtocolHandle, ReputationChange}; +use libp2p::PeerId; +use std::collections::HashSet; + +/// No-op `PeerStore`. +#[derive(Debug)] +pub struct MockPeerStore {} + +impl PeerStoreProvider for MockPeerStore { + fn is_banned(&self, _peer_id: &PeerId) -> bool { + // Make sure that the peer is not banned. + false + } + + fn register_protocol(&self, _protocol_handle: ProtocolHandle) { + // Make sure not to fail. + } + + fn report_disconnect(&mut self, _peer_id: PeerId) { + // Make sure not to fail. + } + + fn report_peer(&mut self, _peer_id: PeerId, _change: ReputationChange) { + // Make sure not to fail. + } + + fn peer_reputation(&self, _peer_id: &PeerId) -> i32 { + // Make sure that the peer is not banned. + 0 + } + + fn outgoing_candidates(&self, _count: usize, _ignored: HashSet<&PeerId>) -> Vec { + unimplemented!() + } +} diff --git a/client/network/src/peer_store.rs b/client/network/src/peer_store.rs index 59886c335784b..2f3d4a1fd1a0b 100644 --- a/client/network/src/peer_store.rs +++ b/client/network/src/peer_store.rs @@ -16,6 +16,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! [`PeerStore`] manages peer reputations and provides connection candidates to +//! [`crate::protocol_controller::ProtocolController`]. + use libp2p::PeerId; use log::trace; use parking_lot::Mutex; @@ -49,6 +52,7 @@ const INVERSE_DECREMENT: i32 = 50; /// remove it, once the reputation value reaches 0. const FORGET_AFTER: Duration = Duration::from_secs(3600); +/// Trait providing peer reputation management and connection candidates. pub trait PeerStoreProvider: Debug + Send { /// Check whether the peer is banned. fn is_banned(&self, peer_id: &PeerId) -> bool; @@ -69,6 +73,7 @@ pub trait PeerStoreProvider: Debug + Send { fn outgoing_candidates(&self, count: usize, ignored: HashSet<&PeerId>) -> Vec; } +/// Actual implementation of peer reputations and connection candidates provider. #[derive(Debug, Clone)] pub struct PeerStoreHandle { inner: Arc>, @@ -289,6 +294,7 @@ impl PeerStoreInner { } } +/// Worker part of [`PeerStoreHandle`] #[derive(Debug)] pub struct PeerStore { inner: Arc>, diff --git a/client/network/src/peerset.rs b/client/network/src/peerset.rs deleted file mode 100644 index fd57175dd77cd..0000000000000 --- a/client/network/src/peerset.rs +++ /dev/null @@ -1,394 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be -//! connected to. -//! -//! The PSM handles *sets* of nodes. A set of nodes is defined as the nodes that are believed to -//! support a certain capability, such as handling blocks and transactions of a specific chain, -//! or collating a certain parachain. -//! -//! For each node in each set, the peerset holds a flag specifying whether the node is -//! connected to us or not. -//! -//! This connected/disconnected status is specific to the node and set combination, and it is for -//! example possible for a node to be connected through a specific set but not another. -//! -//! In addition, for each, set, the peerset also holds a list of reserved nodes towards which it -//! will at all time try to maintain a connection with. - -use crate::{ - peer_store::{PeerStore, PeerStoreHandle, PeerStoreProvider}, - protocol_controller::{ProtocolController, ProtocolHandle}, -}; - -use futures::{ - channel::oneshot, - future::{join_all, BoxFuture, JoinAll}, - prelude::*, - stream::Stream, -}; -use log::debug; -use sc_network_common::types::ReputationChange; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use serde_json::json; -use std::{ - collections::HashSet, - pin::Pin, - task::{Context, Poll}, -}; - -use libp2p::PeerId; - -pub const LOG_TARGET: &str = "peerset"; - -#[derive(Debug)] -enum Action { - AddReservedPeer(SetId, PeerId), - RemoveReservedPeer(SetId, PeerId), - SetReservedPeers(SetId, HashSet), - SetReservedOnly(SetId, bool), - ReportPeer(PeerId, ReputationChange), - AddKnownPeer(PeerId), - PeerReputation(PeerId, oneshot::Sender), -} - -/// Identifier of a set in the peerset. -/// -/// Can be constructed using the `From` trait implementation based on the index of the set -/// within [`PeersetConfig::sets`]. For example, the first element of [`PeersetConfig::sets`] is -/// later referred to with `SetId::from(0)`. It is intended that the code responsible for building -/// the [`PeersetConfig`] is also responsible for constructing the [`SetId`]s. -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct SetId(usize); - -impl SetId { - pub const fn from(id: usize) -> Self { - Self(id) - } -} - -impl From for SetId { - fn from(id: usize) -> Self { - Self(id) - } -} - -impl From for usize { - fn from(id: SetId) -> Self { - id.0 - } -} - -/// Shared handle to the peer set manager (PSM). Distributed around the code. -#[derive(Debug, Clone)] -pub struct PeersetHandle { - tx: TracingUnboundedSender, -} - -impl PeersetHandle { - /// Adds a new reserved peer. The peerset will make an effort to always remain connected to - /// this peer. - /// - /// Has no effect if the node was already a reserved peer. - /// - /// > **Note**: Keep in mind that the networking has to know an address for this node, - /// > otherwise it will not be able to connect to it. - pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id)); - } - - /// Remove a previously-added reserved peer. - /// - /// Has no effect if the node was not a reserved peer. - pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id)); - } - - /// Sets whether or not the peerset only has connections with nodes marked as reserved for - /// the given set. - pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) { - let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved)); - } - - /// Set reserved peers to the new set. - pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet) { - let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids)); - } - - /// Reports an adjustment to the reputation of the given peer. - pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) { - let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); - } - - /// Add a peer to the list of known peers. - pub fn add_known_peer(&self, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::AddKnownPeer(peer_id)); - } - - /// Returns the reputation value of the peer. - pub async fn peer_reputation(self, peer_id: PeerId) -> Result { - let (tx, rx) = oneshot::channel(); - - let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx)); - - // The channel can only be closed if the peerset no longer exists. - rx.await.map_err(|_| ()) - } -} - -/// Message that can be sent by the peer set manager (PSM). -#[derive(Debug, PartialEq)] -pub enum Message { - /// Request to open a connection to the given peer. From the point of view of the PSM, we are - /// immediately connected. - Connect { - /// Set id to connect on. - set_id: SetId, - /// Peer to connect to. - peer_id: PeerId, - }, - - /// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`. - Drop { - /// Set id to disconnect on. - set_id: SetId, - /// Peer to disconnect from. - peer_id: PeerId, - }, - - /// Equivalent to `Connect` for the peer corresponding to this incoming index. - Accept(IncomingIndex), - - /// Equivalent to `Drop` for the peer corresponding to this incoming index. - Reject(IncomingIndex), -} - -/// Opaque identifier for an incoming connection. Allocated by the network. -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct IncomingIndex(pub u64); - -impl From for IncomingIndex { - fn from(val: u64) -> Self { - Self(val) - } -} - -/// Configuration to pass when creating the peer set manager. -#[derive(Debug)] -pub struct PeersetConfig { - /// List of sets of nodes the peerset manages. - pub sets: Vec, -} - -/// Configuration for a single set of nodes. -#[derive(Debug)] -pub struct SetConfig { - /// Maximum number of ingoing links to peers. - pub in_peers: u32, - - /// Maximum number of outgoing links to peers. - pub out_peers: u32, - - /// List of bootstrap nodes to initialize the set with. - /// - /// > **Note**: Keep in mind that the networking has to know an address for these nodes, - /// > otherwise it will not be able to connect to them. - pub bootnodes: Vec, - - /// Lists of nodes we should always be connected to. - /// - /// > **Note**: Keep in mind that the networking has to know an address for these nodes, - /// > otherwise it will not be able to connect to them. - pub reserved_nodes: HashSet, - - /// If true, we only accept nodes in [`SetConfig::reserved_nodes`]. - pub reserved_only: bool, -} - -/// Side of the peer set manager owned by the network. In other words, the "receiving" side. -/// -/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never -/// errors. -pub struct Peerset { - /// Peer reputation store handle. - peer_store_handle: PeerStoreHandle, - /// Peer reputation store. - peer_store_future: BoxFuture<'static, ()>, - /// Protocol handles. - protocol_handles: Vec, - /// Protocol controllers responsible for connections, per `SetId`. - protocol_controller_futures: JoinAll>, - /// Commands sent from protocol controllers to `Notifications`. The size of this vector never - /// changes. - from_controllers: TracingUnboundedReceiver, - /// Receiver for messages from the `PeersetHandle` and from `to_self`. - from_handle: TracingUnboundedReceiver, -} - -impl Peerset { - /// Builds a new peerset from the given configuration. - pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) { - let default_set_config = &config.sets[0]; - let peer_store = PeerStore::new(default_set_config.bootnodes.clone()); - - let (to_notifications, from_controllers) = - tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000); - - let controllers = config - .sets - .into_iter() - .enumerate() - .map(|(set, set_config)| { - ProtocolController::new( - SetId::from(set), - set_config, - to_notifications.clone(), - Box::new(peer_store.handle()), - ) - }) - .collect::>(); - - let (protocol_handles, protocol_controllers): (Vec, Vec<_>) = - controllers.into_iter().unzip(); - - let (tx, from_handle) = tracing_unbounded("mpsc_peerset_messages", 10_000); - - let handle = PeersetHandle { tx }; - - let protocol_controller_futures = - join_all(protocol_controllers.into_iter().map(|c| c.run().boxed())); - - let peerset = Peerset { - peer_store_handle: peer_store.handle(), - peer_store_future: peer_store.run().boxed(), - protocol_handles, - protocol_controller_futures, - from_controllers, - from_handle, - }; - - (peerset, handle) - } - - /// Returns the list of reserved peers. - pub fn reserved_peers(&self, set_id: SetId, pending_response: oneshot::Sender>) { - self.protocol_handles[set_id.0].reserved_peers(pending_response); - } - - /// Indicate that we received an incoming connection. Must be answered either with - /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. - /// - /// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming - /// connection implicitly means `Connect`, but incoming connections aren't cancelled by - /// `dropped`. - // Implementation note: because of concurrency issues, it is possible that we push a `Connect` - // message to the output channel with a `PeerId`, and that `incoming` gets called with the same - // `PeerId` before that message has been read by the user. In this situation we must not answer. - pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) { - self.protocol_handles[set_id.0].incoming_connection(peer_id, index); - } - - /// Indicate that we dropped an active connection with a peer, or that we failed to connect. - /// - /// Must only be called after the PSM has either generated a `Connect` message with this - /// `PeerId`, or accepted an incoming connection with this `PeerId`. - pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, _reason: DropReason) { - self.protocol_handles[set_id.0].dropped(peer_id); - } - - /// Produces a JSON object containing the state of the peerset manager, for debugging purposes. - pub fn debug_info(&mut self) -> serde_json::Value { - // TODO: Check what info we can include here. - // Issue reference: https://github.com/paritytech/substrate/issues/14160. - json!("unimplemented") - } - - /// Returns the number of peers that we have discovered. - pub fn num_discovered_peers(&self) -> usize { - self.peer_store_handle.num_known_peers() - } -} - -impl Stream for Peerset { - type Item = Message; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if let Poll::Ready(msg) = self.from_controllers.poll_next_unpin(cx) { - if let Some(msg) = msg { - return Poll::Ready(Some(msg)) - } else { - debug!( - target: LOG_TARGET, - "All `ProtocolController`s have terminated, terminating `Peerset`." - ); - return Poll::Ready(None) - } - } - - while let Poll::Ready(action) = self.from_handle.poll_next_unpin(cx) { - if let Some(action) = action { - match action { - Action::AddReservedPeer(set_id, peer_id) => - self.protocol_handles[set_id.0].add_reserved_peer(peer_id), - Action::RemoveReservedPeer(set_id, peer_id) => - self.protocol_handles[set_id.0].remove_reserved_peer(peer_id), - Action::SetReservedPeers(set_id, peer_ids) => - self.protocol_handles[set_id.0].set_reserved_peers(peer_ids), - Action::SetReservedOnly(set_id, reserved_only) => - self.protocol_handles[set_id.0].set_reserved_only(reserved_only), - Action::ReportPeer(peer_id, score_diff) => - self.peer_store_handle.report_peer(peer_id, score_diff), - Action::AddKnownPeer(peer_id) => self.peer_store_handle.add_known_peer(peer_id), - Action::PeerReputation(peer_id, pending_response) => { - let _ = - pending_response.send(self.peer_store_handle.peer_reputation(&peer_id)); - }, - } - } else { - debug!(target: LOG_TARGET, "`PeersetHandle` was dropped, terminating `Peerset`."); - return Poll::Ready(None) - } - } - - if let Poll::Ready(()) = self.peer_store_future.poll_unpin(cx) { - debug!(target: LOG_TARGET, "`PeerStore` has terminated, terminating `PeerSet`."); - return Poll::Ready(None) - } - - if let Poll::Ready(_) = self.protocol_controller_futures.poll_unpin(cx) { - debug!( - target: LOG_TARGET, - "All `ProtocolHandle`s have terminated, terminating `PeerSet`." - ); - return Poll::Ready(None) - } - - Poll::Pending - } -} - -/// Reason for calling [`Peerset::dropped`]. -#[derive(Debug)] -pub enum DropReason { - /// Substream or connection has been closed for an unknown reason. - Unknown, - /// Substream or connection has been explicitly refused by the target. In other words, the - /// peer doesn't actually belong to this set. - Refused, -} diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 2c41eeccd5803..8cac92f73a48c 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -17,10 +17,10 @@ // along with this program. If not, see . use crate::{ - config::{self, NonReservedPeerMode}, - error, + config, error, + peer_store::{PeerStoreHandle, PeerStoreProvider}, + protocol_controller::{self, SetId}, types::ProtocolName, - ReputationChange, }; use bytes::Bytes; @@ -37,7 +37,7 @@ use libp2p::{ use log::{debug, error, warn}; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; -use sc_utils::mpsc::TracingUnboundedSender; +use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ @@ -62,10 +62,7 @@ pub mod message; pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024; /// Identifier of the peerset for the block announces protocol. -const HARDCODED_PEERSETS_SYNC: crate::peerset::SetId = crate::peerset::SetId::from(0); -/// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or -/// superior to this value corresponds to a user-defined protocol. -const NUM_HARDCODED_PEERSETS: usize = 1; +const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0); mod rep { use crate::ReputationChange as Rep; @@ -79,7 +76,7 @@ type PendingSyncSubstreamValidation = // Lock must always be taken in order declared here. pub struct Protocol { /// Used to report reputation changes. - peerset_handle: crate::peerset::PeersetHandle, + peer_store_handle: PeerStoreHandle, /// Handles opening the unique substream and sending and receiving raw messages. behaviour: Notifications, /// List of notifications protocols that have been registered. @@ -90,8 +87,8 @@ pub struct Protocol { /// event to the outer layers, we also shouldn't propagate this "substream closed" event. To /// solve this, an entry is added to this map whenever an invalid handshake is received. /// Entries are removed when the corresponding "substream closed" is later received. - bad_handshake_substreams: HashSet<(PeerId, crate::peerset::SetId)>, - /// Connected peers. + bad_handshake_substreams: HashSet<(PeerId, SetId)>, + /// Connected peers on sync protocol. peers: HashMap, sync_substream_validations: FuturesUnordered, tx: TracingUnboundedSender>, @@ -102,66 +99,17 @@ impl Protocol { /// Create a new instance. pub fn new( roles: Roles, - network_config: &config::NetworkConfiguration, notification_protocols: Vec, block_announces_protocol: config::NonDefaultSetConfig, + peer_store_handle: PeerStoreHandle, + protocol_controller_handles: Vec, + from_protocol_controllers: TracingUnboundedReceiver, tx: TracingUnboundedSender>, - ) -> error::Result<(Self, crate::peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { - let mut known_addresses = Vec::new(); - - let (peerset, peerset_handle) = { - let mut sets = - Vec::with_capacity(NUM_HARDCODED_PEERSETS + notification_protocols.len()); - - let mut default_sets_reserved = HashSet::new(); - for reserved in network_config.default_peers_set.reserved_nodes.iter() { - default_sets_reserved.insert(reserved.peer_id); - - if !reserved.multiaddr.is_empty() { - known_addresses.push((reserved.peer_id, reserved.multiaddr.clone())); - } - } - - let mut bootnodes = Vec::with_capacity(network_config.boot_nodes.len()); - for bootnode in network_config.boot_nodes.iter() { - bootnodes.push(bootnode.peer_id); - } - - // Set number 0 is used for block announces. - sets.push(crate::peerset::SetConfig { - in_peers: network_config.default_peers_set.in_peers, - out_peers: network_config.default_peers_set.out_peers, - bootnodes, - reserved_nodes: default_sets_reserved.clone(), - reserved_only: network_config.default_peers_set.non_reserved_mode == - NonReservedPeerMode::Deny, - }); - - for set_cfg in ¬ification_protocols { - let mut reserved_nodes = HashSet::new(); - for reserved in set_cfg.set_config.reserved_nodes.iter() { - reserved_nodes.insert(reserved.peer_id); - known_addresses.push((reserved.peer_id, reserved.multiaddr.clone())); - } - - let reserved_only = - set_cfg.set_config.non_reserved_mode == NonReservedPeerMode::Deny; - - sets.push(crate::peerset::SetConfig { - in_peers: set_cfg.set_config.in_peers, - out_peers: set_cfg.set_config.out_peers, - bootnodes: Vec::new(), - reserved_nodes, - reserved_only, - }); - } - - crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig { sets }) - }; - + ) -> error::Result { let behaviour = { Notifications::new( - peerset, + protocol_controller_handles, + from_protocol_controllers, // NOTE: Block announcement protocol is still very much hardcoded into `Protocol`. // This protocol must be the first notification protocol given to // `Notifications` @@ -181,7 +129,7 @@ impl Protocol { }; let protocol = Self { - peerset_handle: peerset_handle.clone(), + peer_store_handle, behaviour, notification_protocols: iter::once(block_announces_protocol.notifications_protocol) .chain(notification_protocols.iter().map(|s| s.notifications_protocol.clone())) @@ -194,7 +142,7 @@ impl Protocol { _marker: Default::default(), }; - Ok((protocol, peerset_handle, known_addresses)) + Ok(protocol) } /// Returns the list of all the peers we have an open channel to. @@ -202,42 +150,28 @@ impl Protocol { self.behaviour.open_peers() } - /// Returns the number of discovered nodes that we keep in memory. - pub fn num_discovered_peers(&self) -> usize { - self.behaviour.num_discovered_peers() - } - /// Disconnects the given peer if we are connected to it. pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) { if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name) { - self.behaviour.disconnect_peer(peer_id, crate::peerset::SetId::from(position)); - self.peers.remove(peer_id); + // Note: no need to remove a peer from `self.peers` if we are dealing with sync + // protocol, because it will be done when handling + // `NotificationsOut::CustomProtocolClosed`. + self.behaviour.disconnect_peer(peer_id, SetId::from(position)); } else { warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name") } } - /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&mut self) -> serde_json::Value { - self.behaviour.peerset_debug_info() - } - - /// Returns the number of peers we're connected to. + /// Returns the number of peers we're connected to on sync protocol. pub fn num_connected_peers(&self) -> usize { self.peers.len() } - /// Adjusts the reputation of a node. - pub fn report_peer(&self, who: PeerId, reputation: ReputationChange) { - self.peerset_handle.report_peer(who, reputation) - } - /// Set handshake for the notification protocol. pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec) { if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.behaviour - .set_notif_protocol_handshake(crate::peerset::SetId::from(index), handshake); + self.behaviour.set_notif_protocol_handshake(SetId::from(index), handshake); } else { error!( target: "sub-libp2p", @@ -246,81 +180,6 @@ impl Protocol { ); } } - - /// Set whether the syncing peers set is in reserved-only mode. - pub fn set_reserved_only(&self, reserved_only: bool) { - self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); - } - - /// Removes a `PeerId` from the list of reserved peers for syncing purposes. - pub fn remove_reserved_peer(&self, peer: PeerId) { - self.peerset_handle.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); - } - - /// Returns the list of reserved peers. - pub fn reserved_peers(&self, pending_response: oneshot::Sender>) { - self.behaviour.reserved_peers(HARDCODED_PEERSETS_SYNC, pending_response); - } - - /// Adds a `PeerId` to the list of reserved peers for syncing purposes. - pub fn add_reserved_peer(&self, peer: PeerId) { - self.peerset_handle.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer); - } - - /// Sets the list of reserved peers for syncing purposes. - pub fn set_reserved_peers(&self, peers: HashSet) { - self.peerset_handle.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers); - } - - /// Sets the list of reserved peers for the given protocol/peerset. - pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet) { - if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle - .set_reserved_peers(crate::peerset::SetId::from(index), peers); - } else { - error!( - target: "sub-libp2p", - "set_reserved_peerset_peers with unknown protocol: {}", - protocol - ); - } - } - - /// Removes a `PeerId` from the list of reserved peers. - pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { - if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle - .remove_reserved_peer(crate::peerset::SetId::from(index), peer); - } else { - error!( - target: "sub-libp2p", - "remove_set_reserved_peer with unknown protocol: {}", - protocol - ); - } - } - - /// Adds a `PeerId` to the list of reserved peers. - pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) { - if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) { - self.peerset_handle.add_reserved_peer(crate::peerset::SetId::from(index), peer); - } else { - error!( - target: "sub-libp2p", - "add_set_reserved_peer with unknown protocol: {}", - protocol - ); - } - } - - /// Notify the protocol that we have learned about the existence of some peer. - /// - /// Can be called multiple times with the same `PeerId`. - pub fn add_known_peer(&mut self, peer_id: PeerId) { - // TODO: get rid of this function and call `Peerset`/`PeerStore` directly - // from `NetworkWorker`. - self.peerset_handle.add_known_peer(peer_id); - } } /// Outcome of an incoming custom message. @@ -507,7 +366,7 @@ impl NetworkBehaviour for Protocol { peer_id, msg, ); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + self.peer_store_handle.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, Err(err) => { @@ -549,7 +408,7 @@ impl NetworkBehaviour for Protocol { err, err2, ); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + self.peer_store_handle.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, } @@ -586,8 +445,7 @@ impl NetworkBehaviour for Protocol { debug!(target: "sync", "Failed to parse remote handshake: {}", err); self.bad_handshake_substreams.insert((peer_id, set_id)); self.behaviour.disconnect_peer(&peer_id, set_id); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); - self.peers.remove(&peer_id); + self.peer_store_handle.report_peer(peer_id, rep::BAD_MESSAGE); CustomMessageOutcome::None }, } diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 2037af4170286..255b637013594 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -17,16 +17,16 @@ // along with this program. If not, see . use crate::{ - peerset::DropReason, protocol::notifications::handler::{ self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut, }, + protocol_controller::{self, IncomingIndex, Message, SetId}, types::ProtocolName, }; use bytes::BytesMut; use fnv::FnvHashMap; -use futures::{channel::oneshot, prelude::*}; +use futures::prelude::*; use libp2p::{ core::{ConnectedPoint, Endpoint, Multiaddr}, swarm::{ @@ -39,6 +39,7 @@ use libp2p::{ use log::{debug, error, info, trace, warn}; use parking_lot::RwLock; use rand::distributions::{Distribution as _, Uniform}; +use sc_utils::mpsc::TracingUnboundedReceiver; use smallvec::SmallVec; use std::{ cmp, @@ -105,11 +106,14 @@ pub struct Notifications { /// Notification protocols. Entries never change after initialization. notif_protocols: Vec, + /// Protocol controllers are responsible for peer connections management. + protocol_controller_handles: Vec, + /// Receiver for instructions about who to connect to or disconnect from. - peerset: crate::peerset::Peerset, + from_protocol_controllers: TracingUnboundedReceiver, /// List of peers in our state. - peers: FnvHashMap<(PeerId, crate::peerset::SetId), PeerState>, + peers: FnvHashMap<(PeerId, SetId), PeerState>, /// The elements in `peers` occasionally contain `Delay` objects that we would normally have /// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is @@ -118,9 +122,8 @@ pub struct Notifications { /// /// By design, we never remove elements from this list. Elements are removed only when the /// `Delay` triggers. As such, this stream may produce obsolete elements. - delays: stream::FuturesUnordered< - Pin + Send>>, - >, + delays: + stream::FuturesUnordered + Send>>>, /// [`DelayId`] to assign to the next delay. next_delay_id: DelayId, @@ -131,7 +134,7 @@ pub struct Notifications { /// We generate indices to identify incoming connections. This is the next value for the index /// to use when a connection is incoming. - next_incoming_index: crate::peerset::IncomingIndex, + next_incoming_index: IncomingIndex, /// Events to produce from `poll()`. events: VecDeque>, @@ -230,7 +233,7 @@ enum PeerState { backoff_until: Option, /// Incoming index tracking this connection. - incoming_index: crate::peerset::IncomingIndex, + incoming_index: IncomingIndex, /// List of connections with this peer, and their state. connections: SmallVec<[(ConnectionId, ConnectionState); crate::MAX_CONNECTIONS_PER_PEER]>, @@ -294,12 +297,12 @@ struct IncomingPeer { /// Id of the remote peer of the incoming substream. peer_id: PeerId, /// Id of the set the incoming substream would belong to. - set_id: crate::peerset::SetId, + set_id: SetId, /// If true, this "incoming" still corresponds to an actual connection. If false, then the /// connection corresponding to it has been closed or replaced already. alive: bool, /// Id that the we sent to the peerset. - incoming_id: crate::peerset::IncomingIndex, + incoming_id: IncomingIndex, } /// Event that can be emitted by the `Notifications`. @@ -310,7 +313,7 @@ pub enum NotificationsOut { /// Id of the peer we are connected to. peer_id: PeerId, /// Peerset set ID the substream is tied to. - set_id: crate::peerset::SetId, + set_id: SetId, /// If `Some`, a fallback protocol name has been used rather the main protocol name. /// Always matches one of the fallback names passed at initialization. negotiated_fallback: Option, @@ -332,7 +335,7 @@ pub enum NotificationsOut { /// Id of the peer we are connected to. peer_id: PeerId, /// Peerset set ID the substream is tied to. - set_id: crate::peerset::SetId, + set_id: SetId, /// Replacement for the previous [`NotificationsSink`]. notifications_sink: NotificationsSink, }, @@ -343,7 +346,7 @@ pub enum NotificationsOut { /// Id of the peer we were connected to. peer_id: PeerId, /// Peerset set ID the substream was tied to. - set_id: crate::peerset::SetId, + set_id: SetId, }, /// Receives a message on a custom protocol substream. @@ -353,7 +356,7 @@ pub enum NotificationsOut { /// Id of the peer the message came from. peer_id: PeerId, /// Peerset set ID the substream is tied to. - set_id: crate::peerset::SetId, + set_id: SetId, /// Message that has been received. message: BytesMut, }, @@ -362,7 +365,8 @@ pub enum NotificationsOut { impl Notifications { /// Creates a `CustomProtos`. pub fn new( - peerset: crate::peerset::Peerset, + protocol_controller_handles: Vec, + from_protocol_controllers: TracingUnboundedReceiver, notif_protocols: impl Iterator, ) -> Self { let notif_protocols = notif_protocols @@ -378,12 +382,13 @@ impl Notifications { Self { notif_protocols, - peerset, + protocol_controller_handles, + from_protocol_controllers, peers: FnvHashMap::default(), delays: Default::default(), next_delay_id: DelayId(0), incoming: SmallVec::new(), - next_incoming_index: crate::peerset::IncomingIndex(0), + next_incoming_index: IncomingIndex(0), events: VecDeque::new(), } } @@ -391,7 +396,7 @@ impl Notifications { /// Modifies the handshake of the given notifications protocol. pub fn set_notif_protocol_handshake( &mut self, - set_id: crate::peerset::SetId, + set_id: SetId, handshake_message: impl Into>, ) { if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) { @@ -402,29 +407,24 @@ impl Notifications { } } - /// Returns the number of discovered nodes that we keep in memory. - pub fn num_discovered_peers(&self) -> usize { - self.peerset.num_discovered_peers() - } - /// Returns the list of all the peers we have an open channel to. pub fn open_peers(&self) -> impl Iterator { self.peers.iter().filter(|(_, state)| state.is_open()).map(|((id, _), _)| id) } /// Returns true if we have an open substream to the given peer. - pub fn is_open(&self, peer_id: &PeerId, set_id: crate::peerset::SetId) -> bool { + pub fn is_open(&self, peer_id: &PeerId, set_id: SetId) -> bool { self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false) } /// Disconnects the given peer if we are connected to it. - pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: crate::peerset::SetId) { + pub fn disconnect_peer(&mut self, peer_id: &PeerId, set_id: SetId) { trace!(target: "sub-libp2p", "External API => Disconnect({}, {:?})", peer_id, set_id); self.disconnect_peer_inner(peer_id, set_id); } /// Inner implementation of `disconnect_peer`. - fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: crate::peerset::SetId) { + fn disconnect_peer_inner(&mut self, peer_id: &PeerId, set_id: SetId) { let mut entry = if let Entry::Occupied(entry) = self.peers.entry((*peer_id, set_id)) { entry } else { @@ -441,7 +441,7 @@ impl Notifications { // DisabledPendingEnable => Disabled. PeerState::DisabledPendingEnable { connections, timer_deadline, timer: _ } => { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, *peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id); *entry.into_mut() = PeerState::Disabled { connections, backoff_until: Some(timer_deadline) } }, @@ -451,7 +451,7 @@ impl Notifications { // If relevant, the external API is instantly notified. PeerState::Enabled { mut connections } => { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, *peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(set_id)].dropped(*peer_id); if connections.iter().any(|(_, s)| matches!(s, ConnectionState::Open(_))) { trace!(target: "sub-libp2p", "External API <= Closed({}, {:?})", peer_id, set_id); @@ -538,22 +538,8 @@ impl Notifications { } } - /// Returns the list of reserved peers. - pub fn reserved_peers( - &self, - set_id: crate::peerset::SetId, - pending_response: oneshot::Sender>, - ) { - self.peerset.reserved_peers(set_id, pending_response); - } - - /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&mut self) -> serde_json::Value { - self.peerset.debug_info() - } - /// Function that is called when the peerset wants us to connect to a peer. - fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: crate::peerset::SetId) { + fn peerset_report_connect(&mut self, peer_id: PeerId, set_id: SetId) { // If `PeerId` is unknown to us, insert an entry, start dialing, and return early. let mut occ_entry = match self.peers.entry((peer_id, set_id)) { Entry::Occupied(entry) => entry, @@ -731,7 +717,7 @@ impl Notifications { } /// Function that is called when the peerset wants us to disconnect from a peer. - fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: crate::peerset::SetId) { + fn peerset_report_disconnect(&mut self, peer_id: PeerId, set_id: SetId) { let mut entry = match self.peers.entry((peer_id, set_id)) { Entry::Occupied(entry) => entry, Entry::Vacant(entry) => { @@ -839,7 +825,7 @@ impl Notifications { /// Function that is called when the peerset wants us to accept a connection /// request from a peer. - fn peerset_report_accept(&mut self, index: crate::peerset::IncomingIndex) { + fn peerset_report_accept(&mut self, index: IncomingIndex) { let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) { self.incoming.remove(pos) @@ -857,7 +843,8 @@ impl Notifications { _ => { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", incoming.peer_id, incoming.set_id); - self.peerset.dropped(incoming.set_id, incoming.peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(incoming.set_id)] + .dropped(incoming.peer_id); }, } return @@ -925,7 +912,7 @@ impl Notifications { } /// Function that is called when the peerset wants us to reject an incoming peer. - fn peerset_report_reject(&mut self, index: crate::peerset::IncomingIndex) { + fn peerset_report_reject(&mut self, index: IncomingIndex) { let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) { self.incoming.remove(pos) @@ -1059,7 +1046,7 @@ impl NetworkBehaviour for Notifications { connection_id, .. }) => { - for set_id in (0..self.notif_protocols.len()).map(crate::peerset::SetId::from) { + for set_id in (0..self.notif_protocols.len()).map(SetId::from) { match self.peers.entry((peer_id, set_id)).or_insert(PeerState::Poisoned) { // Requested | PendingRequest => Enabled st @ &mut PeerState::Requested | @@ -1113,7 +1100,7 @@ impl NetworkBehaviour for Notifications { } }, FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => { - for set_id in (0..self.notif_protocols.len()).map(crate::peerset::SetId::from) { + for set_id in (0..self.notif_protocols.len()).map(SetId::from) { let mut entry = if let Entry::Occupied(entry) = self.peers.entry((peer_id, set_id)) { @@ -1195,7 +1182,8 @@ impl NetworkBehaviour for Notifications { if connections.is_empty() { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(set_id)] + .dropped(peer_id); *entry.get_mut() = PeerState::Backoff { timer, timer_deadline }; } else { *entry.get_mut() = PeerState::DisabledPendingEnable { @@ -1349,7 +1337,8 @@ impl NetworkBehaviour for Notifications { if connections.is_empty() { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(set_id)] + .dropped(peer_id); let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng()); let delay_id = self.next_delay_id; @@ -1371,7 +1360,8 @@ impl NetworkBehaviour for Notifications { matches!(s, ConnectionState::Opening | ConnectionState::Open(_)) }) { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(set_id)] + .dropped(peer_id); *entry.get_mut() = PeerState::Disabled { connections, backoff_until: None }; @@ -1406,7 +1396,7 @@ impl NetworkBehaviour for Notifications { if let Some(peer_id) = peer_id { trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); - for set_id in (0..self.notif_protocols.len()).map(crate::peerset::SetId::from) { + for set_id in (0..self.notif_protocols.len()).map(SetId::from) { if let Entry::Occupied(mut entry) = self.peers.entry((peer_id, set_id)) { match mem::replace(entry.get_mut(), PeerState::Poisoned) { // The peer is not in our list. @@ -1419,7 +1409,8 @@ impl NetworkBehaviour for Notifications { st @ PeerState::Requested | st @ PeerState::PendingRequest { .. } => { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, peer_id, DropReason::Unknown); + self.protocol_controller_handles[usize::from(set_id)] + .dropped(peer_id); let now = Instant::now(); let ban_duration = match st { @@ -1486,7 +1477,7 @@ impl NetworkBehaviour for Notifications { ) { match event { NotifsHandlerOut::OpenDesiredByRemote { protocol_index } => { - let set_id = crate::peerset::SetId::from(protocol_index); + let set_id = SetId::from(protocol_index); trace!(target: "sub-libp2p", "Handler({:?}, {:?}]) => OpenDesiredByRemote({:?})", @@ -1592,7 +1583,8 @@ impl NetworkBehaviour for Notifications { trace!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}, {:?}).", peer_id, set_id, incoming_id); - self.peerset.incoming(set_id, peer_id, incoming_id); + self.protocol_controller_handles[usize::from(set_id)] + .incoming_connection(peer_id, incoming_id); self.incoming.push(IncomingPeer { peer_id, set_id, @@ -1676,7 +1668,7 @@ impl NetworkBehaviour for Notifications { }, NotifsHandlerOut::CloseDesired { protocol_index } => { - let set_id = crate::peerset::SetId::from(protocol_index); + let set_id = SetId::from(protocol_index); trace!(target: "sub-libp2p", "Handler({}, {:?}) => CloseDesired({:?})", @@ -1748,7 +1740,8 @@ impl NetworkBehaviour for Notifications { .any(|(_, s)| matches!(s, ConnectionState::Opening)) { trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, peer_id, DropReason::Refused); + self.protocol_controller_handles[usize::from(set_id)] + .dropped(peer_id); *entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }; } else { @@ -1776,7 +1769,7 @@ impl NetworkBehaviour for Notifications { }, NotifsHandlerOut::CloseResult { protocol_index } => { - let set_id = crate::peerset::SetId::from(protocol_index); + let set_id = SetId::from(protocol_index); trace!(target: "sub-libp2p", "Handler({}, {:?}) => CloseResult({:?})", @@ -1816,7 +1809,7 @@ impl NetworkBehaviour for Notifications { inbound, .. } => { - let set_id = crate::peerset::SetId::from(protocol_index); + let set_id = SetId::from(protocol_index); trace!(target: "sub-libp2p", "Handler({}, {:?}) => OpenResultOk({:?})", peer_id, connection_id, set_id); @@ -1883,7 +1876,7 @@ impl NetworkBehaviour for Notifications { }, NotifsHandlerOut::OpenResultErr { protocol_index } => { - let set_id = crate::peerset::SetId::from(protocol_index); + let set_id = SetId::from(protocol_index); trace!(target: "sub-libp2p", "Handler({:?}, {:?}) => OpenResultErr({:?})", peer_id, connection_id, set_id); @@ -1924,7 +1917,7 @@ impl NetworkBehaviour for Notifications { matches!(s, ConnectionState::Opening | ConnectionState::Open(_)) }) { trace!(target: "sub-libp2p", "PSM <= Dropped({:?}, {:?})", peer_id, set_id); - self.peerset.dropped(set_id, peer_id, DropReason::Refused); + self.protocol_controller_handles[usize::from(set_id)].dropped(peer_id); let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng()); *entry.into_mut() = PeerState::Disabled { @@ -1972,7 +1965,7 @@ impl NetworkBehaviour for Notifications { }, NotifsHandlerOut::Notification { protocol_index, message } => { - let set_id = crate::peerset::SetId::from(protocol_index); + let set_id = SetId::from(protocol_index); if self.is_open(&peer_id, set_id) { trace!( target: "sub-libp2p", @@ -2014,24 +2007,26 @@ impl NetworkBehaviour for Notifications { return Poll::Ready(event) } - // Poll for instructions from the peerset. - // Note that the peerset is a *best effort* crate, and we have to use defensive programming. + // Poll for instructions from the protocol controllers. loop { - match futures::Stream::poll_next(Pin::new(&mut self.peerset), cx) { - Poll::Ready(Some(crate::peerset::Message::Accept(index))) => { + match futures::Stream::poll_next(Pin::new(&mut self.from_protocol_controllers), cx) { + Poll::Ready(Some(Message::Accept(index))) => { self.peerset_report_accept(index); }, - Poll::Ready(Some(crate::peerset::Message::Reject(index))) => { + Poll::Ready(Some(Message::Reject(index))) => { self.peerset_report_reject(index); }, - Poll::Ready(Some(crate::peerset::Message::Connect { peer_id, set_id, .. })) => { + Poll::Ready(Some(Message::Connect { peer_id, set_id, .. })) => { self.peerset_report_connect(peer_id, set_id); }, - Poll::Ready(Some(crate::peerset::Message::Drop { peer_id, set_id, .. })) => { + Poll::Ready(Some(Message::Drop { peer_id, set_id, .. })) => { self.peerset_report_disconnect(peer_id, set_id); }, Poll::Ready(None) => { - error!(target: "sub-libp2p", "Peerset receiver stream has returned None"); + error!( + target: "sub-libp2p", + "Protocol controllers receiver stream has returned `None`. Ignore this error if the node is shutting down.", + ); break }, Poll::Pending => break, @@ -2108,7 +2103,12 @@ impl NetworkBehaviour for Notifications { #[allow(deprecated)] mod tests { use super::*; - use crate::{peerset::IncomingIndex, protocol::notifications::handler::tests::*}; + use crate::{ + mock::MockPeerStore, + protocol::notifications::handler::tests::*, + protocol_controller::{IncomingIndex, ProtoSetConfig, ProtocolController}, + }; + use sc_utils::mpsc::tracing_unbounded; use std::{collections::HashSet, iter}; impl PartialEq for ConnectionState { @@ -2137,24 +2137,26 @@ mod tests { } } - fn development_notifs() -> (Notifications, crate::peerset::PeersetHandle) { - let (peerset, peerset_handle) = { - let mut sets = Vec::with_capacity(1); + fn development_notifs() -> (Notifications, ProtocolController) { + let (to_notifications, from_controller) = + tracing_unbounded("test_controller_to_notifications", 10_000); - sets.push(crate::peerset::SetConfig { + let (handle, controller) = ProtocolController::new( + SetId::from(0), + ProtoSetConfig { in_peers: 25, out_peers: 25, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, - }); - - crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig { sets }) - }; + }, + to_notifications, + Box::new(MockPeerStore {}), + ); ( Notifications::new( - peerset, + vec![handle], + from_controller, iter::once(ProtocolConfig { name: "/foo".into(), fallback_names: Vec::new(), @@ -2162,13 +2164,13 @@ mod tests { max_notification_size: u64::MAX, }), ), - peerset_handle, + controller, ) } #[test] fn update_handshake() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let inner = notif.notif_protocols.get_mut(0).unwrap().handshake.read().clone(); assert_eq!(inner, vec![1, 2, 3, 4]); @@ -2183,14 +2185,14 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn update_unknown_handshake() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); notif.set_notif_protocol_handshake(1337.into(), vec![5, 6, 7, 8]); } #[test] fn disconnect_backoff_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); notif.peers.insert( @@ -2207,7 +2209,7 @@ mod tests { #[test] fn disconnect_pending_request() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); notif.peers.insert( @@ -2224,7 +2226,7 @@ mod tests { #[test] fn disconnect_requested_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); notif.peers.insert((peer, 0.into()), PeerState::Requested); @@ -2235,7 +2237,7 @@ mod tests { #[test] fn disconnect_disabled_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); notif.peers.insert( (peer, 0.into()), @@ -2251,7 +2253,7 @@ mod tests { #[test] fn remote_opens_connection_and_substream() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -2295,13 +2297,13 @@ mod tests { assert!(std::matches!( notif.incoming.pop(), - Some(IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. }), + Some(IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }), )); } #[tokio::test] - async fn disconnect_remote_substream_before_handled_by_peerset() { - let (mut notif, _peerset) = development_notifs(); + async fn disconnect_remote_substream_before_handled_by_controller() { + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -2337,8 +2339,8 @@ mod tests { #[test] fn peerset_report_connect_backoff() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -2402,10 +2404,10 @@ mod tests { #[test] fn peerset_connect_incoming() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2438,8 +2440,8 @@ mod tests { #[test] fn peerset_disconnect_disable_pending_enable() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -2485,10 +2487,10 @@ mod tests { #[test] fn peerset_disconnect_enabled() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2521,9 +2523,9 @@ mod tests { #[test] fn peerset_disconnect_requested() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); // Set peer into `Requested` state. notif.peerset_report_connect(peer, set_id); @@ -2536,8 +2538,8 @@ mod tests { #[test] fn peerset_disconnect_pending_request() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -2589,10 +2591,10 @@ mod tests { #[test] fn peerset_accept_peer_not_alive() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2619,28 +2621,28 @@ mod tests { assert!(std::matches!( notif.incoming[0], - IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. }, + IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }, )); notif.disconnect_peer(&peer, set_id); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Disabled { .. }))); assert!(std::matches!( notif.incoming[0], - IncomingPeer { alive: false, incoming_id: crate::peerset::IncomingIndex(0), .. }, + IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. }, )); - notif.peerset_report_accept(crate::peerset::IncomingIndex(0)); + notif.peerset_report_accept(IncomingIndex(0)); assert_eq!(notif.incoming.len(), 0); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(PeerState::Disabled { .. }))); } #[test] fn secondary_connection_peer_state_incoming() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2691,10 +2693,10 @@ mod tests { #[test] fn close_connection_for_disabled_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2725,10 +2727,10 @@ mod tests { #[test] fn close_connection_for_incoming_peer_one_connection() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2764,17 +2766,17 @@ mod tests { assert!(notif.peers.get(&(peer, set_id)).is_none()); assert!(std::matches!( notif.incoming[0], - IncomingPeer { alive: false, incoming_id: crate::peerset::IncomingIndex(0), .. }, + IncomingPeer { alive: false, incoming_id: IncomingIndex(0), .. }, )); } #[test] fn close_connection_for_incoming_peer_two_connections() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let conn1 = ConnectionId::new_unchecked(1); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2839,10 +2841,10 @@ mod tests { #[test] fn connection_and_substream_open() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2870,7 +2872,7 @@ mod tests { // We rely on the implementation detail that incoming indices are counted // from 0 to not mock the `Peerset`. - notif.peerset_report_accept(crate::peerset::IncomingIndex(0)); + notif.peerset_report_accept(IncomingIndex(0)); assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); // open new substream @@ -2893,11 +2895,11 @@ mod tests { #[test] fn connection_closed_sink_replaced() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn1 = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -2959,7 +2961,6 @@ mod tests { // check peer information assert_eq!(notif.open_peers().collect::>(), vec![&peer],); - assert_eq!(notif.num_discovered_peers(), 0usize); // close the other connection and verify that notification replacement event is emitted notif.on_swarm_event(FromSwarm::ConnectionClosed( @@ -2988,9 +2989,9 @@ mod tests { #[test] fn dial_failure_for_requested_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); // Set peer into `Requested` state. notif.peerset_report_connect(peer, set_id); @@ -3011,10 +3012,10 @@ mod tests { #[tokio::test] async fn write_notification() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3060,8 +3061,8 @@ mod tests { #[test] fn peerset_report_connect_backoff_expired() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -3108,9 +3109,9 @@ mod tests { #[test] fn peerset_report_disconnect_disabled() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3134,8 +3135,8 @@ mod tests { #[test] fn peerset_report_disconnect_backoff() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -3180,8 +3181,8 @@ mod tests { #[test] fn peer_is_backed_off_if_both_connections_get_closed_while_peer_is_disabled_with_back_off() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn1 = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); @@ -3253,9 +3254,9 @@ mod tests { #[test] fn inject_connection_closed_incoming_with_backoff() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3304,11 +3305,11 @@ mod tests { #[test] fn two_connections_inactive_connection_gets_closed_peer_state_is_still_incoming() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn1 = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3359,11 +3360,11 @@ mod tests { #[test] fn two_connections_active_connection_gets_closed_peer_state_is_disabled() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn1 = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3417,11 +3418,11 @@ mod tests { #[test] fn inject_connection_closed_for_active_connection() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn1 = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3485,8 +3486,8 @@ mod tests { #[test] fn inject_dial_failure_for_pending_request() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -3548,9 +3549,9 @@ mod tests { #[test] fn peerstate_incoming_open_desired_by_remote() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let conn1 = ConnectionId::new_unchecked(0); let conn2 = ConnectionId::new_unchecked(1); let connected = ConnectedPoint::Listener { @@ -3602,9 +3603,9 @@ mod tests { #[tokio::test] async fn remove_backoff_peer_after_timeout() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -3680,10 +3681,10 @@ mod tests { #[tokio::test] async fn reschedule_disabled_pending_enable_when_connection_not_closed() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3798,10 +3799,10 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn peerset_report_connect_with_enabled_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -3848,8 +3849,8 @@ mod tests { #[test] #[cfg(debug_assertions)] fn peerset_report_connect_with_disabled_pending_enable_peer() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -3894,9 +3895,9 @@ mod tests { #[test] #[cfg(debug_assertions)] fn peerset_report_connect_with_requested_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); // Set peer into `Requested` state. notif.peerset_report_connect(peer, set_id); @@ -3910,8 +3911,8 @@ mod tests { #[test] #[cfg(debug_assertions)] fn peerset_report_connect_with_pending_requested() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -3967,9 +3968,9 @@ mod tests { #[test] #[cfg(debug_assertions)] fn peerset_report_connect_with_incoming_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4002,9 +4003,9 @@ mod tests { #[test] #[cfg(debug_assertions)] fn peerset_report_disconnect_with_incoming_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4038,10 +4039,10 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn peerset_report_accept_incoming_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4068,21 +4069,21 @@ mod tests { assert!(std::matches!( notif.incoming[0], - IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. }, + IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }, )); notif.peers.remove(&(peer, set_id)); - notif.peerset_report_accept(crate::peerset::IncomingIndex(0)); + notif.peerset_report_accept(IncomingIndex(0)); } #[test] #[should_panic] #[cfg(debug_assertions)] fn peerset_report_accept_not_incoming_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4110,7 +4111,7 @@ mod tests { assert!(std::matches!( notif.incoming[0], - IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. }, + IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }, )); notif.peerset_report_connect(peer, set_id); @@ -4121,14 +4122,14 @@ mod tests { assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Enabled { .. }))); notif.incoming[0].alive = true; - notif.peerset_report_accept(crate::peerset::IncomingIndex(0)); + notif.peerset_report_accept(IncomingIndex(0)); } #[test] #[should_panic] #[cfg(debug_assertions)] fn inject_connection_closed_non_existent_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let endpoint = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), @@ -4148,9 +4149,9 @@ mod tests { #[test] fn disconnect_non_existent_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); notif.peerset_report_disconnect(peer, set_id); @@ -4160,7 +4161,7 @@ mod tests { #[test] fn accept_non_existent_connection() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); notif.peerset_report_accept(0.into()); @@ -4170,7 +4171,7 @@ mod tests { #[test] fn reject_non_existent_connection() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); notif.peerset_report_reject(0.into()); @@ -4180,10 +4181,10 @@ mod tests { #[test] fn reject_non_active_connection() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4218,10 +4219,10 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn reject_non_existent_peer_but_alive_connection() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4247,7 +4248,7 @@ mod tests { assert!(std::matches!(notif.peers.get(&(peer, set_id)), Some(&PeerState::Incoming { .. }))); assert!(std::matches!( notif.incoming[0], - IncomingPeer { alive: true, incoming_id: crate::peerset::IncomingIndex(0), .. }, + IncomingPeer { alive: true, incoming_id: IncomingIndex(0), .. }, )); notif.peers.remove(&(peer, set_id)); @@ -4258,10 +4259,10 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn inject_non_existent_connection_closed_for_incoming_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4301,8 +4302,8 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn inject_non_existent_connection_closed_for_disabled_peer() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -4336,8 +4337,8 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn inject_non_existent_connection_closed_for_disabled_pending_enable() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -4387,10 +4388,10 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn inject_connection_closed_for_incoming_peer_state_mismatch() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4431,10 +4432,10 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn inject_connection_closed_for_enabled_state_mismatch() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); - let set_id = crate::peerset::SetId::from(0); + let set_id = SetId::from(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), send_back_addr: Multiaddr::empty(), @@ -4478,8 +4479,8 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn inject_connection_closed_for_backoff_peer() { - let (mut notif, _peerset) = development_notifs(); - let set_id = crate::peerset::SetId::from(0); + let (mut notif, _controller) = development_notifs(); + let set_id = SetId::from(0); let peer = PeerId::random(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { @@ -4532,7 +4533,7 @@ mod tests { #[should_panic] #[cfg(debug_assertions)] fn open_result_ok_non_existent_peer() { - let (mut notif, _peerset) = development_notifs(); + let (mut notif, _controller) = development_notifs(); let conn = ConnectionId::new_unchecked(0); let connected = ConnectedPoint::Listener { local_addr: Multiaddr::empty(), diff --git a/client/network/src/protocol/notifications/tests.rs b/client/network/src/protocol/notifications/tests.rs index ddeea495a2737..ebed9d63049e4 100644 --- a/client/network/src/protocol/notifications/tests.rs +++ b/client/network/src/protocol/notifications/tests.rs @@ -18,9 +18,13 @@ #![cfg(test)] -use crate::protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig}; +use crate::{ + peer_store::PeerStore, + protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig}, + protocol_controller::{ProtoSetConfig, ProtocolController, SetId}, +}; -use futures::prelude::*; +use futures::{future::BoxFuture, prelude::*}; use libp2p::{ core::{transport::MemoryTransport, upgrade, Endpoint}, identity, noise, @@ -31,6 +35,7 @@ use libp2p::{ }, yamux, Multiaddr, PeerId, Transport, }; +use sc_utils::mpsc::tracing_unbounded; use std::{ iter, pin::Pin, @@ -65,28 +70,31 @@ fn build_nodes() -> (Swarm, Swarm) { .timeout(Duration::from_secs(20)) .boxed(); - let (peerset, handle) = - crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig { - sets: vec![crate::peerset::SetConfig { - in_peers: 25, - out_peers: 25, - bootnodes: if index == 0 { - keypairs - .iter() - .skip(1) - .map(|keypair| keypair.public().to_peer_id()) - .collect() - } else { - vec![] - }, - reserved_nodes: Default::default(), - reserved_only: false, - }], - }); + let peer_store = PeerStore::new(if index == 0 { + keypairs.iter().skip(1).map(|keypair| keypair.public().to_peer_id()).collect() + } else { + vec![] + }); + + let (to_notifications, from_controller) = + tracing_unbounded("test_protocol_controller_to_notifications", 10_000); + + let (controller_handle, controller) = ProtocolController::new( + SetId::from(0), + ProtoSetConfig { + in_peers: 25, + out_peers: 25, + reserved_nodes: Default::default(), + reserved_only: false, + }, + to_notifications, + Box::new(peer_store.handle()), + ); let behaviour = CustomProtoWithAddr { inner: Notifications::new( - peerset, + vec![controller_handle], + from_controller, iter::once(ProtocolConfig { name: "/foo".into(), fallback_names: Vec::new(), @@ -94,7 +102,8 @@ fn build_nodes() -> (Swarm, Swarm) { max_notification_size: 1024 * 1024, }), ), - _peerset_handle: handle, + peer_store_future: peer_store.run().boxed(), + protocol_controller_future: controller.run().boxed(), addrs: addrs .iter() .enumerate() @@ -130,8 +139,8 @@ fn build_nodes() -> (Swarm, Swarm) { /// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. struct CustomProtoWithAddr { inner: Notifications, - // We need to keep `PeersetHandle` for `Peerset` not to shut down. - _peerset_handle: crate::peerset::PeersetHandle, + peer_store_future: BoxFuture<'static, ()>, + protocol_controller_future: BoxFuture<'static, ()>, addrs: Vec<(PeerId, Multiaddr)>, } @@ -230,6 +239,9 @@ impl NetworkBehaviour for CustomProtoWithAddr { cx: &mut Context, params: &mut impl PollParameters, ) -> Poll>> { + let _ = self.peer_store_future.poll_unpin(cx); + let _ = self.protocol_controller_future.poll_unpin(cx); + self.inner.poll(cx, params) } } @@ -272,10 +284,9 @@ fn reconnect_after_disconnect() { ServiceState::NotConnected => { service1_state = ServiceState::FirstConnec; if service2_state == ServiceState::FirstConnec { - service1.behaviour_mut().disconnect_peer( - Swarm::local_peer_id(&service2), - crate::peerset::SetId::from(0), - ); + service1 + .behaviour_mut() + .disconnect_peer(Swarm::local_peer_id(&service2), SetId::from(0)); } }, ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain, @@ -295,10 +306,9 @@ fn reconnect_after_disconnect() { ServiceState::NotConnected => { service2_state = ServiceState::FirstConnec; if service1_state == ServiceState::FirstConnec { - service1.behaviour_mut().disconnect_peer( - Swarm::local_peer_id(&service2), - crate::peerset::SetId::from(0), - ); + service1 + .behaviour_mut() + .disconnect_peer(Swarm::local_peer_id(&service2), SetId::from(0)); } }, ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain, diff --git a/client/network/src/protocol_controller.rs b/client/network/src/protocol_controller.rs index a7190cc639f01..c9baa0a77d4ba 100644 --- a/client/network/src/protocol_controller.rs +++ b/client/network/src/protocol_controller.rs @@ -52,14 +52,94 @@ use std::{ }; use wasm_timer::Delay; -use crate::{ - peer_store::PeerStoreProvider, - peerset::{IncomingIndex, Message, SetConfig, SetId}, -}; +use crate::peer_store::PeerStoreProvider; /// Log target for this file. pub const LOG_TARGET: &str = "peerset"; +/// `Notifications` protocol index. For historical reasons it's called `SetId`, because it +/// used to refer to a set of peers in a peerset for this protocol. +/// +/// Can be constructed using the `From` trait implementation based on the index of the +/// protocol in `Notifications`. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SetId(usize); + +impl SetId { + /// Const conversion function for initialization of hardcoded peerset indices. + pub const fn from(id: usize) -> Self { + Self(id) + } +} + +impl From for SetId { + fn from(id: usize) -> Self { + Self(id) + } +} + +impl From for usize { + fn from(id: SetId) -> Self { + id.0 + } +} + +/// Configuration for a set of nodes for a specific protocol. +#[derive(Debug)] +pub struct ProtoSetConfig { + /// Maximum number of incoming links to peers. + pub in_peers: u32, + + /// Maximum number of outgoing links to peers. + pub out_peers: u32, + + /// Lists of nodes we should always be connected to. + /// + /// > **Note**: Keep in mind that the networking has to know an address for these nodes, + /// > otherwise it will not be able to connect to them. + pub reserved_nodes: HashSet, + + /// If true, we only accept nodes in [`ProtoSetConfig::reserved_nodes`]. + pub reserved_only: bool, +} + +/// Message that is sent by [`ProtocolController`] to `Notifications`. +#[derive(Debug, PartialEq)] +pub enum Message { + /// Request to open a connection to the given peer. From the point of view of the + /// `ProtocolController`, we are immediately connected. + Connect { + /// Set id to connect on. + set_id: SetId, + /// Peer to connect to. + peer_id: PeerId, + }, + + /// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`. + Drop { + /// Set id to disconnect on. + set_id: SetId, + /// Peer to disconnect from. + peer_id: PeerId, + }, + + /// Equivalent to `Connect` for the peer corresponding to this incoming index. + Accept(IncomingIndex), + + /// Equivalent to `Drop` for the peer corresponding to this incoming index. + Reject(IncomingIndex), +} + +/// Opaque identifier for an incoming connection. Allocated by the network. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct IncomingIndex(pub u64); + +impl From for IncomingIndex { + fn from(val: u64) -> Self { + Self(val) + } +} + /// External API actions. #[derive(Debug)] enum Action { @@ -179,8 +259,7 @@ impl Default for PeerState { } } -/// Side of [`ProtocolHandle`] responsible for all the logic. Currently all instances are -/// owned by [`crate::Peerset`], but they should eventually be moved to corresponding protocols. +/// Worker side of [`ProtocolHandle`] responsible for all the logic. #[derive(Debug)] pub struct ProtocolController { /// Set id to use when sending connect/drop requests to `Notifications`. @@ -217,7 +296,7 @@ impl ProtocolController { /// Construct new [`ProtocolController`]. pub fn new( set_id: SetId, - config: SetConfig, + config: ProtoSetConfig, to_notifications: TracingUnboundedSender, peer_store: Box, ) -> (ProtocolHandle, ProtocolController) { @@ -758,12 +837,8 @@ impl ProtocolController { #[cfg(test)] mod tests { - use super::{Direction, PeerState, ProtocolController, ProtocolHandle}; - use crate::{ - peer_store::PeerStoreProvider, - peerset::{IncomingIndex, Message, SetConfig, SetId}, - ReputationChange, - }; + use super::*; + use crate::{peer_store::PeerStoreProvider, ReputationChange}; use libp2p::PeerId; use sc_utils::mpsc::{tracing_unbounded, TryRecvError}; use std::collections::HashSet; @@ -788,10 +863,9 @@ mod tests { let reserved2 = PeerId::random(); // Add first reserved node via config. - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, out_peers: 0, - bootnodes: Vec::new(), reserved_nodes: std::iter::once(reserved1).collect(), reserved_only: true, }; @@ -851,10 +925,9 @@ mod tests { let reserved2 = PeerId::random(); // Add first reserved node via config. - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, out_peers: 0, - bootnodes: Vec::new(), reserved_nodes: std::iter::once(reserved1).collect(), reserved_only: true, }; @@ -903,10 +976,9 @@ mod tests { let reserved2 = PeerId::random(); // Add first reserved node via config. - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, out_peers: 0, - bootnodes: Vec::new(), reserved_nodes: std::iter::once(reserved1).collect(), reserved_only: true, }; @@ -962,11 +1034,10 @@ mod tests { let peer2 = PeerId::random(); let candidates = vec![peer1, peer2]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, // Less slots than candidates. out_peers: 2, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1014,13 +1085,8 @@ mod tests { let outgoing_candidates = vec![regular1, regular2]; let reserved_nodes = [reserved1, reserved2].iter().cloned().collect(); - let config = SetConfig { - in_peers: 10, - out_peers: 10, - bootnodes: Vec::new(), - reserved_nodes, - reserved_only: false, - }; + let config = + ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false }; let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100); let mut peer_store = MockPeerStoreHandle::new(); @@ -1055,11 +1121,10 @@ mod tests { let candidates1 = vec![peer1, peer2]; let candidates2 = vec![peer3]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, // Less slots than candidates. out_peers: 2, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1126,11 +1191,10 @@ mod tests { #[test] fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() { - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, // Make sure we have slots available. out_peers: 2, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: true, }; @@ -1153,11 +1217,10 @@ mod tests { #[test] fn in_reserved_only_mode_no_regular_peers_are_accepted() { - let config = SetConfig { + let config = ProtoSetConfig { // Make sure we have slots available. in_peers: 2, out_peers: 0, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: true, }; @@ -1191,11 +1254,10 @@ mod tests { let peer2 = PeerId::random(); let candidates = vec![peer1, peer2]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 0, // Make sure we have slots available. out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: true, }; @@ -1239,10 +1301,9 @@ mod tests { let regular2 = PeerId::random(); let outgoing_candidates = vec![regular1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: [reserved1, reserved2].iter().cloned().collect(), reserved_only: false, }; @@ -1300,10 +1361,9 @@ mod tests { let reserved1 = PeerId::random(); let reserved2 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: [reserved1, reserved2].iter().cloned().collect(), reserved_only: false, }; @@ -1333,10 +1393,9 @@ mod tests { let reserved1 = PeerId::random(); let reserved2 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: [reserved1, reserved2].iter().cloned().collect(), reserved_only: true, }; @@ -1380,10 +1439,9 @@ mod tests { let peer1 = PeerId::random(); let peer2 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: [peer1, peer2].iter().cloned().collect(), reserved_only: false, }; @@ -1427,10 +1485,9 @@ mod tests { let peer2 = PeerId::random(); let outgoing_candidates = vec![peer1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1470,10 +1527,9 @@ mod tests { let peer2 = PeerId::random(); let outgoing_candidates = vec![peer1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1530,10 +1586,9 @@ mod tests { let reserved1 = PeerId::random(); let reserved2 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: [reserved1, reserved2].iter().cloned().collect(), reserved_only: false, }; @@ -1587,10 +1642,9 @@ mod tests { let peer2 = PeerId::random(); let outgoing_candidates = vec![peer1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1640,10 +1694,9 @@ mod tests { let reserved1 = PeerId::random(); let reserved2 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: [reserved1, reserved2].iter().cloned().collect(), reserved_only: false, }; @@ -1701,10 +1754,9 @@ mod tests { let regular2 = PeerId::random(); let outgoing_candidates = vec![regular1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1754,10 +1806,9 @@ mod tests { let regular2 = PeerId::random(); let outgoing_candidates = vec![regular1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1808,10 +1859,9 @@ mod tests { let regular2 = PeerId::random(); let outgoing_candidates = vec![regular1]; - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 1, out_peers: 1, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1862,10 +1912,9 @@ mod tests { let peer1 = PeerId::random(); let peer2 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 1, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1893,10 +1942,9 @@ mod tests { fn banned_regular_incoming_node_is_rejected() { let peer1 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: HashSet::new(), reserved_only: false, }; @@ -1919,10 +1967,9 @@ mod tests { fn banned_reserved_incoming_node_is_rejected() { let reserved1 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: std::iter::once(reserved1).collect(), reserved_only: false, }; @@ -1946,10 +1993,9 @@ mod tests { fn we_dont_connect_to_banned_reserved_node() { let reserved1 = PeerId::random(); - let config = SetConfig { + let config = ProtoSetConfig { in_peers: 10, out_peers: 10, - bootnodes: Vec::new(), reserved_nodes: std::iter::once(reserved1).collect(), reserved_only: false, }; diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index c0754132a13a1..6fffaa8fa1413 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -35,7 +35,9 @@ //! is used to handle incoming requests. use crate::{ - peer_store::BANNED_THRESHOLD, peerset::PeersetHandle, types::ProtocolName, ReputationChange, + peer_store::{PeerStoreProvider, BANNED_THRESHOLD}, + types::ProtocolName, + ReputationChange, }; use futures::{channel::oneshot, prelude::*}; @@ -279,28 +281,7 @@ pub struct RequestResponsesBehaviour { send_feedback: HashMap>, /// Primarily used to get a reputation of a node. - peerset: PeersetHandle, - - /// Pending message request, holds `MessageRequest` as a Future state to poll it - /// until we get a response from `Peerset` - message_request: Option, -} - -// This is a state of processing incoming request Message. -// The main reason of this struct is to hold `get_peer_reputation` as a Future state. -struct MessageRequest { - peer: PeerId, - request_id: RequestId, - request: Vec, - channel: ResponseChannel, ()>>, - protocol: ProtocolName, - // A builder used for building responses for incoming requests. Note that we use - // `async_channel` and not `mpsc` on purpose, because `mpsc::channel` allocates an extra - // message slot for every cloned `Sender` and this breaks a back-pressure mechanism. - resp_builder: Option>, - // Once we get incoming request we save all params, create an async call to Peerset - // to get the reputation of the peer. - get_peer_reputation: Pin> + Send>>, + peer_store: Box, } /// Generated by the response builder and waiting to be processed. @@ -317,7 +298,7 @@ impl RequestResponsesBehaviour { /// the same protocol is passed twice. pub fn new( list: impl Iterator, - peerset: PeersetHandle, + peer_store: Box, ) -> Result { let mut protocols = HashMap::new(); for protocol in list { @@ -354,8 +335,7 @@ impl RequestResponsesBehaviour { pending_responses: Default::default(), pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), - peerset, - message_request: None, + peer_store, }) } @@ -576,96 +556,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour { params: &mut impl PollParameters, ) -> Poll>> { 'poll_all: loop { - if let Some(message_request) = self.message_request.take() { - // Now we can can poll `MessageRequest` until we get the reputation - - let MessageRequest { - peer, - request_id, - request, - channel, - protocol, - resp_builder, - mut get_peer_reputation, - } = message_request; - - let reputation = Future::poll(Pin::new(&mut get_peer_reputation), cx); - match reputation { - Poll::Pending => { - // Save the state to poll it again next time. - - self.message_request = Some(MessageRequest { - peer, - request_id, - request, - channel, - protocol, - resp_builder, - get_peer_reputation, - }); - return Poll::Pending - }, - Poll::Ready(reputation) => { - // Once we get the reputation we can continue processing the request. - - let reputation = reputation.expect( - "The channel can only be closed if the peerset no longer exists; qed", - ); - - if reputation < BANNED_THRESHOLD { - log::debug!( - target: "sub-libp2p", - "Cannot handle requests from a node with a low reputation {}: {}", - peer, - reputation, - ); - continue 'poll_all - } - - log::trace!(target: "sub-libp2p", "request received from {peer} ({protocol:?}), {} bytes", request.len()); - - let (tx, rx) = oneshot::channel(); - - // Submit the request to the "response builder" passed by the user at - // initialization. - if let Some(resp_builder) = resp_builder { - // If the response builder is too busy, silently drop `tx`. This - // will be reported by the corresponding request-response [`Behaviour`] - // through an `InboundFailure::Omission` event. - // Note that we use `async_channel::bounded` and not `mpsc::channel` - // because the latter allocates an extra slot for every cloned sender. - let _ = resp_builder.try_send(IncomingRequest { - peer, - payload: request, - pending_response: tx, - }); - } else { - debug_assert!(false, "Received message on outbound-only protocol."); - } - - self.pending_responses.push(Box::pin(async move { - // The `tx` created above can be dropped if we are not capable of - // processing this request, which is reflected as a - // `InboundFailure::Omission` event. - if let Ok(response) = rx.await { - Some(RequestProcessingOutcome { - peer, - request_id, - protocol, - inner_channel: channel, - response, - }) - } else { - None - } - })); - - // This `continue` makes sure that `pending_responses` gets polled - // after we have added the new element. - continue 'poll_all - }, - } - } // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { @@ -712,7 +602,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Poll request-responses protocols. for (protocol, (behaviour, resp_builder)) in &mut self.protocols { - while let Poll::Ready(ev) = behaviour.poll(cx, params) { + 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { let ev = match ev { // Main events we are interested in. ToSwarm::GenerateEvent(ev) => ev, @@ -756,23 +646,56 @@ impl NetworkBehaviour for RequestResponsesBehaviour { self.pending_responses_arrival_time .insert((protocol.clone(), request_id).into(), Instant::now()); - let get_peer_reputation = self.peerset.clone().peer_reputation(peer); - let get_peer_reputation = Box::pin(get_peer_reputation); + let reputation = self.peer_store.peer_reputation(&peer); - // Save the Future-like state with params to poll `get_peer_reputation` - // and to continue processing the request once we get the reputation of - // the peer. - self.message_request = Some(MessageRequest { - peer, - request_id, - request, - channel, - protocol: protocol.clone(), - resp_builder: resp_builder.clone(), - get_peer_reputation, - }); + if reputation < BANNED_THRESHOLD { + log::debug!( + target: "sub-libp2p", + "Cannot handle requests from a node with a low reputation {}: {}", + peer, + reputation, + ); + continue 'poll_protocol + } - // This `continue` makes sure that `message_request` gets polled + let (tx, rx) = oneshot::channel(); + + // Submit the request to the "response builder" passed by the user at + // initialization. + if let Some(resp_builder) = resp_builder { + // If the response builder is too busy, silently drop `tx`. This + // will be reported by the corresponding request-response + // [`Behaviour`] through an `InboundFailure::Omission` event. + // Note that we use `async_channel::bounded` and not `mpsc::channel` + // because the latter allocates an extra slot for every cloned + // sender. + let _ = resp_builder.try_send(IncomingRequest { + peer, + payload: request, + pending_response: tx, + }); + } else { + debug_assert!(false, "Received message on outbound-only protocol."); + } + + let protocol = protocol.clone(); + + self.pending_responses.push(Box::pin(async move { + // The `tx` created above can be dropped if we are not capable of + // processing this request, which is reflected as a + // `InboundFailure::Omission` event. + rx.await.map_or(None, |response| { + Some(RequestProcessingOutcome { + peer, + request_id, + protocol, + inner_channel: channel, + response, + }) + }) + })); + + // This `continue` makes sure that `pending_responses` gets polled // after we have added the new element. continue 'poll_all }, @@ -1064,7 +987,7 @@ impl Codec for GenericCodec { mod tests { use super::*; - use crate::peerset::{Peerset, PeersetConfig, SetConfig}; + use crate::mock::MockPeerStore; use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; use libp2p::{ core::{ @@ -1087,7 +1010,7 @@ mod tests { fn build_swarm( list: impl Iterator, - ) -> (Swarm, Multiaddr, Peerset) { + ) -> (Swarm, Multiaddr) { let keypair = Keypair::generate_ed25519(); let transport = MemoryTransport::new() @@ -1096,19 +1019,7 @@ mod tests { .multiplex(libp2p::yamux::Config::default()) .boxed(); - let config = PeersetConfig { - sets: vec![SetConfig { - in_peers: u32::max_value(), - out_peers: u32::max_value(), - bootnodes: vec![], - reserved_nodes: Default::default(), - reserved_only: false, - }], - }; - - let (peerset, handle) = Peerset::from_config(config); - - let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap(); + let behaviour = RequestResponsesBehaviour::new(list, Box::new(MockPeerStore {})).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); let mut swarm = SwarmBuilder::with_executor( @@ -1121,11 +1032,7 @@ mod tests { let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); swarm.listen_on(listen_addr.clone()).unwrap(); - (swarm, listen_addr, peerset) - } - - async fn loop_peerset(peerset: Peerset) { - let _: Vec<_> = peerset.collect().await; + (swarm, listen_addr) } #[test] @@ -1177,9 +1084,7 @@ mod tests { Swarm::dial(&mut swarms[0].0, dial_addr).unwrap(); } - let (mut swarm, _, peerset) = swarms.remove(0); - // Process every peerset event in the background. - pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); + let (mut swarm, _) = swarms.remove(0); // Running `swarm[0]` in the background. pool.spawner() .spawn_obj({ @@ -1199,9 +1104,7 @@ mod tests { .unwrap(); // Remove and run the remaining swarm. - let (mut swarm, _, peerset) = swarms.remove(0); - // Process every peerset event in the background. - pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); + let (mut swarm, _) = swarms.remove(0); pool.run_until(async move { let mut response_receiver = None; @@ -1280,9 +1183,7 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. - let (mut swarm, _, peerset) = swarms.remove(0); - // Process every peerset event in the background. - pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); + let (mut swarm, _) = swarms.remove(0); pool.spawner() .spawn_obj({ async move { @@ -1302,9 +1203,7 @@ mod tests { .unwrap(); // Remove and run the remaining swarm. - let (mut swarm, _, peerset) = swarms.remove(0); - // Process every peerset event in the background. - pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); + let (mut swarm, _) = swarms.remove(0); pool.run_until(async move { let mut response_receiver = None; @@ -1376,7 +1275,7 @@ mod tests { build_swarm(protocol_configs.into_iter()).0 }; - let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = { + let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = { let (tx_1, rx_1) = async_channel::bounded(64); let (tx_2, rx_2) = async_channel::bounded(64); @@ -1399,12 +1298,10 @@ mod tests { }, ]; - let (swarm, listen_addr, peerset) = build_swarm(protocol_configs.into_iter()); + let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter()); - (swarm, rx_1, rx_2, listen_addr, peerset) + (swarm, rx_1, rx_2, listen_addr) }; - // Process every peerset event in the background. - pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); // Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test, // so they wouldn't connect to each other. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 9d18bdfc6213a..533589eec96dc 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -36,8 +36,9 @@ use crate::{ network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, - peerset::PeersetHandle, + peer_store::{PeerStoreHandle, PeerStoreProvider}, protocol::{self, NotifsHandlerError, Protocol, Ready}, + protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId}, request_responses::{IfDisconnected, RequestFailure}, service::{ signature::{Signature, SigningError}, @@ -113,9 +114,6 @@ pub struct NetworkService { local_identity: Keypair, /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. bandwidth: Arc, - /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which - /// nodes it should be connected to or not. - peerset: PeersetHandle, /// Channel that sends messages to the actual worker. to_worker: TracingUnboundedSender, /// For each peer and protocol combination, an object that allows sending notifications to @@ -124,6 +122,14 @@ pub struct NetworkService { /// Field extracted from the [`Metrics`] struct and necessary to report the /// notifications-related metrics. notifications_sizes_metric: Option, + /// Protocol name -> `SetId` mapping for notification protocols. The map never changes after + /// initialization. + notification_protocol_ids: HashMap, + /// Handles to manage peer connections on notification protocols. The vector never changes + /// after initialization. + protocol_handles: Vec, + /// Shortcut to sync protocol handle (`protocol_handles[0]`). + sync_protocol_handle: protocol_controller::ProtocolHandle, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, @@ -260,27 +266,93 @@ where ) }; - let (protocol, peerset_handle, mut known_addresses) = Protocol::new( + let (to_notifications, from_protocol_controllers) = + tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000); + + // We must prepend a hardcoded default peer set to notification protocols. + let all_peer_sets_iter = iter::once(&network_config.default_peers_set) + .chain(notification_protocols.iter().map(|protocol| &protocol.set_config)); + + let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter + .enumerate() + .map(|(set_id, set_config)| { + let proto_set_config = ProtoSetConfig { + in_peers: set_config.in_peers, + out_peers: set_config.out_peers, + reserved_nodes: set_config + .reserved_nodes + .iter() + .map(|node| node.peer_id) + .collect(), + reserved_only: set_config.non_reserved_mode.is_reserved_only(), + }; + + ProtocolController::new( + SetId::from(set_id), + proto_set_config, + to_notifications.clone(), + Box::new(params.peer_store.clone()), + ) + }) + .unzip(); + + // Shortcut to default (sync) peer set protocol handle. + let sync_protocol_handle = protocol_handles[0].clone(); + + // Spawn `ProtocolController` runners. + protocol_controllers + .into_iter() + .for_each(|controller| (params.executor)(controller.run().boxed())); + + // Protocol name to protocol id mapping. The first protocol is always block announce (sync) + // protocol, aka default (hardcoded) peer set. + let notification_protocol_ids: HashMap = + iter::once(¶ms.block_announce_config) + .chain(notification_protocols.iter()) + .enumerate() + .map(|(index, protocol)| { + (protocol.notifications_protocol.clone(), SetId::from(index)) + }) + .collect(); + + let protocol = Protocol::new( From::from(¶ms.role), - &network_config, - notification_protocols, + notification_protocols.clone(), params.block_announce_config, + params.peer_store.clone(), + protocol_handles.clone(), + from_protocol_controllers, params.tx, )?; - // List of multiaddresses that we know in the network. - let mut boot_node_ids = HashMap::>::new(); + let known_addresses = { + // Collect all reserved nodes and bootnodes addresses. + let mut addresses: Vec<_> = network_config + .default_peers_set + .reserved_nodes + .iter() + .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone())) + .chain(notification_protocols.iter().flat_map(|protocol| { + protocol + .set_config + .reserved_nodes + .iter() + .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone())) + })) + .chain( + network_config + .boot_nodes + .iter() + .map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())), + ) + .collect(); - // Process the bootnodes. - for bootnode in network_config.boot_nodes.iter() { - boot_node_ids - .entry(bootnode.peer_id) - .or_default() - .push(bootnode.multiaddr.clone()); - known_addresses.push((bootnode.peer_id, bootnode.multiaddr.clone())); - } + // Remove possible duplicates. + addresses.sort(); + addresses.dedup(); - let boot_node_ids = Arc::new(boot_node_ids); + addresses + }; // Check for duplicate bootnodes. network_config.boot_nodes.iter().try_for_each(|bootnode| { @@ -300,6 +372,18 @@ where } })?; + // List of bootnode multiaddresses. + let mut boot_node_ids = HashMap::>::new(); + + for bootnode in network_config.boot_nodes.iter() { + boot_node_ids + .entry(bootnode.peer_id) + .or_default() + .push(bootnode.multiaddr.clone()); + } + + let boot_node_ids = Arc::new(boot_node_ids); + let num_connected = Arc::new(AtomicUsize::new(0)); let external_addresses = Arc::new(Mutex::new(HashSet::new())); @@ -349,7 +433,7 @@ where local_public, discovery_config, request_response_protocols, - peerset_handle.clone(), + params.peer_store.clone(), ConnectionLimits::default() .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32)) .with_max_established_incoming(Some( @@ -422,7 +506,6 @@ where external_addresses, listen_addresses: listen_addresses.clone(), num_connected: num_connected.clone(), - peerset: peerset_handle, local_peer_id, local_identity, to_worker, @@ -430,6 +513,9 @@ where notifications_sizes_metric: metrics .as_ref() .map(|metrics| metrics.notifications_sizes.clone()), + notification_protocol_ids, + protocol_handles, + sync_protocol_handle, _marker: PhantomData, _block: Default::default(), }); @@ -441,10 +527,11 @@ where service, from_service, event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, - peers_notifications_sinks, metrics, boot_node_ids, reported_invalid_boot_nodes: Default::default(), + peers_notifications_sinks, + peer_store_handle: params.peer_store, _marker: Default::default(), _block: Default::default(), }) @@ -601,7 +688,11 @@ where external_addresses, connected_peers, not_connected_peers, - peerset: swarm.behaviour_mut().user_protocol_mut().peerset_debug_info(), + // TODO: Check what info we can include here. + // Issue reference: https://github.com/paritytech/substrate/issues/14160. + peerset: serde_json::json!( + "Unimplemented. See https://github.com/paritytech/substrate/issues/14160." + ), } } @@ -614,14 +705,6 @@ where pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> { self.service.add_reserved_peer(peer) } - - /// Returns the list of reserved peers. - fn reserved_peers(&self, pending_response: oneshot::Sender>) { - self.network_service - .behaviour() - .user_protocol() - .reserved_peers(pending_response); - } } impl NetworkService { @@ -651,11 +734,9 @@ impl NetworkService { pub async fn reserved_peers(&self) -> Result, ()> { let (tx, rx) = oneshot::channel(); - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx }); + self.sync_protocol_handle.reserved_peers(tx); - // The channel can only be closed if the network worker no longer exists. + // The channel can only be closed if `ProtocolController` no longer exists. rx.await.map_err(|_| ()) } @@ -767,13 +848,11 @@ where H: ExHashT, { fn set_authorized_peers(&self, peers: HashSet) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReserved(peers)); + self.sync_protocol_handle.set_reserved_peers(peers); } fn set_authorized_only(&self, reserved_only: bool) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); + self.sync_protocol_handle.set_reserved_only(reserved_only); } fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) { @@ -783,7 +862,7 @@ where } fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - self.peerset.report_peer(who, cost_benefit); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::ReportPeer(who, cost_benefit)); } fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { @@ -791,15 +870,15 @@ where } fn accept_unreserved_peers(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(false)); + self.sync_protocol_handle.set_reserved_only(false); } fn deny_unreserved_peers(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(true)); + self.sync_protocol_handle.set_reserved_only(true); } fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> { - // Make sure the local peer ID is never added to the PSM. + // Make sure the local peer ID is never added as a reserved peer. if peer.peer_id == self.local_peer_id { return Err("Local peer ID cannot be added as a reserved peer.".to_string()) } @@ -807,12 +886,12 @@ where let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer.peer_id, peer.multiaddr)); - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddReserved(peer.peer_id)); + self.sync_protocol_handle.add_reserved_peer(peer.peer_id); Ok(()) } fn remove_reserved_peer(&self, peer_id: PeerId) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RemoveReserved(peer_id)); + self.sync_protocol_handle.remove_reserved_peer(peer_id); } fn set_reserved_peers( @@ -820,6 +899,10 @@ where protocol: ProtocolName, peers: HashSet, ) -> Result<(), String> { + let Some(set_id) = self.notification_protocol_ids.get(&protocol) else { + return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol)) + }; + let peers_addrs = self.split_multiaddr_and_peer_id(peers)?; let mut peers: HashSet = HashSet::with_capacity(peers_addrs.len()); @@ -839,9 +922,7 @@ where } } - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::SetPeersetReserved(protocol, peers)); + self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers); Ok(()) } @@ -851,6 +932,12 @@ where protocol: ProtocolName, peers: HashSet, ) -> Result<(), String> { + let Some(set_id) = self.notification_protocol_ids.get(&protocol) else { + return Err( + format!("Cannot add peers to reserved set of unknown protocol: {}", protocol) + ) + }; + let peers = self.split_multiaddr_and_peer_id(peers)?; for (peer_id, addr) in peers.into_iter() { @@ -864,20 +951,29 @@ where .to_worker .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::AddSetReserved(protocol.clone(), peer_id)); + + self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id); } Ok(()) } - fn remove_peers_from_reserved_set(&self, protocol: ProtocolName, peers: Vec) { + fn remove_peers_from_reserved_set( + &self, + protocol: ProtocolName, + peers: Vec, + ) -> Result<(), String> { + let Some(set_id) = self.notification_protocol_ids.get(&protocol) else { + return Err( + format!("Cannot remove peers from reserved set of unknown protocol: {}", protocol) + ) + }; + for peer_id in peers.into_iter() { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::RemoveSetReserved(protocol.clone(), peer_id)); + self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id); } + + Ok(()) } fn sync_num_connected(&self) -> usize { @@ -1085,13 +1181,7 @@ enum ServiceToWorkerMsg { GetValue(KademliaKey), PutValue(KademliaKey, Vec), AddKnownAddress(PeerId, Multiaddr), - SetReservedOnly(bool), - AddReserved(PeerId), - RemoveReserved(PeerId), - SetReserved(HashSet), - SetPeersetReserved(ProtocolName, HashSet), - AddSetReserved(ProtocolName, PeerId), - RemoveSetReserved(ProtocolName, PeerId), + ReportPeer(PeerId, ReputationChange), EventStream(out_events::Sender), Request { target: PeerId, @@ -1108,9 +1198,6 @@ enum ServiceToWorkerMsg { }, DisconnectPeer(PeerId, ProtocolName), SetNotificationHandshake(ProtocolName, Vec), - ReservedPeers { - pending_response: oneshot::Sender>, - }, } /// Main network worker. Must be polled in order for the network to advance. @@ -1143,6 +1230,8 @@ where /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Shared with the [`NetworkService`]. peers_notifications_sinks: Arc>>, + /// Peer reputation store handle. + peer_store_handle: PeerStoreHandle, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, @@ -1204,8 +1293,7 @@ where } metrics .peerset_num_discovered - .set(self.network_service.behaviour_mut().user_protocol().num_discovered_peers() - as u64); + .set(self.peer_store_handle.num_known_peers() as u64); metrics.pending_connections.set( Swarm::network_info(&self.network_service).connection_counters().num_pending() as u64, @@ -1222,43 +1310,10 @@ where self.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => self.network_service.behaviour_mut().put_value(key, value), - ServiceToWorkerMsg::SetReservedOnly(reserved_only) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_only(reserved_only), - ServiceToWorkerMsg::SetReserved(peers) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_peers(peers), - ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_peerset_peers(protocol, peers), - ServiceToWorkerMsg::AddReserved(peer_id) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_reserved_peer(peer_id), - ServiceToWorkerMsg::RemoveReserved(peer_id) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_reserved_peer(peer_id), - ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_set_reserved_peer(protocol, peer_id), - ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => self - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_set_reserved_peer(protocol, peer_id), ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => self.network_service.behaviour_mut().add_known_address(peer_id, addr), + ServiceToWorkerMsg::ReportPeer(peer_id, reputation_change) => + self.peer_store_handle.report_peer(peer_id, reputation_change), ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), ServiceToWorkerMsg::Request { target, @@ -1291,9 +1346,6 @@ where .behaviour_mut() .user_protocol_mut() .set_notification_handshake(protocol, handshake), - ServiceToWorkerMsg::ReservedPeers { pending_response } => { - self.reserved_peers(pending_response); - }, } } @@ -1373,7 +1425,7 @@ where }, SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => { for change in changes { - self.network_service.behaviour().user_protocol().report_peer(peer, change); + self.peer_store_handle.report_peer(peer, change); } }, SwarmEvent::Behaviour(BehaviourOut::PeerIdentify { @@ -1403,14 +1455,14 @@ where addr.clone(), ); } - self.network_service.behaviour_mut().user_protocol_mut().add_known_peer(peer_id); + self.peer_store_handle.add_known_peer(peer_id); // Confirm the observed address manually since they are no longer trusted by // default (libp2p >= 0.52) // TODO: remove this when/if AutoNAT is implemented. self.network_service.add_external_address(observed_addr); }, SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => { - self.network_service.behaviour_mut().user_protocol_mut().add_known_peer(peer_id); + self.peer_store_handle.add_known_peer(peer_id); }, SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => { if let Some(metrics) = self.metrics.as_ref() { diff --git a/client/network/src/service/traits.rs b/client/network/src/service/traits.rs index bebf5a5713c45..bed325ede4a85 100644 --- a/client/network/src/service/traits.rs +++ b/client/network/src/service/traits.rs @@ -188,7 +188,8 @@ pub trait NetworkPeers { /// this step if the peer set is in reserved only mode. /// /// Returns an `Err` if one of the given addresses is invalid or contains an - /// invalid peer ID (which includes the local peer ID). + /// invalid peer ID (which includes the local peer ID), or if `protocol` does not + /// refer to a known protocol. fn set_reserved_peers( &self, protocol: ProtocolName, @@ -201,7 +202,8 @@ pub trait NetworkPeers { /// consist of only `/p2p/`. /// /// Returns an `Err` if one of the given addresses is invalid or contains an - /// invalid peer ID (which includes the local peer ID). + /// invalid peer ID (which includes the local peer ID), or if `protocol` does not + /// refer to a know protocol. fn add_peers_to_reserved_set( &self, protocol: ProtocolName, @@ -209,7 +211,13 @@ pub trait NetworkPeers { ) -> Result<(), String>; /// Remove peers from a peer set. - fn remove_peers_from_reserved_set(&self, protocol: ProtocolName, peers: Vec); + /// + /// Returns `Err` if `protocol` does not refer to a known protocol. + fn remove_peers_from_reserved_set( + &self, + protocol: ProtocolName, + peers: Vec, + ) -> Result<(), String>; /// Returns the number of peers in the sync peer set we're connected to. fn sync_num_connected(&self) -> usize; @@ -277,7 +285,11 @@ where T::add_peers_to_reserved_set(self, protocol, peers) } - fn remove_peers_from_reserved_set(&self, protocol: ProtocolName, peers: Vec) { + fn remove_peers_from_reserved_set( + &self, + protocol: ProtocolName, + peers: Vec, + ) -> Result<(), String> { T::remove_peers_from_reserved_set(self, protocol, peers) } diff --git a/client/network/statement/src/lib.rs b/client/network/statement/src/lib.rs index a055cd07a0740..800534eada43c 100644 --- a/client/network/statement/src/lib.rs +++ b/client/network/statement/src/lib.rs @@ -297,10 +297,13 @@ where } }, SyncEvent::PeerDisconnected(remote) => { - self.network.remove_peers_from_reserved_set( + let result = self.network.remove_peers_from_reserved_set( self.protocol_name.clone(), iter::once(remote).collect(), ); + if let Err(err) = result { + log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}"); + } }, } } diff --git a/client/network/sync/src/service/mock.rs b/client/network/sync/src/service/mock.rs index b3ef0f328140b..885eb1f8da593 100644 --- a/client/network/sync/src/service/mock.rs +++ b/client/network/sync/src/service/mock.rs @@ -99,7 +99,11 @@ mockall::mock! { protocol: ProtocolName, peers: HashSet, ) -> Result<(), String>; - fn remove_peers_from_reserved_set(&self, protocol: ProtocolName, peers: Vec); + fn remove_peers_from_reserved_set( + &self, + protocol: ProtocolName, + peers: Vec + ) -> Result<(), String>; fn sync_num_connected(&self) -> usize; } diff --git a/client/network/test/src/peerset.rs b/client/network/test/src/fuzz.rs similarity index 78% rename from client/network/test/src/peerset.rs rename to client/network/test/src/fuzz.rs index 855d2339eda12..2e288accd80bc 100644 --- a/client/network/test/src/peerset.rs +++ b/client/network/test/src/fuzz.rs @@ -16,20 +16,22 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! Fuzz test emulates network events and peer connection handling by `ProtocolController` +//! and `PeerStore` to discover possible inconsistencies in peer management. + use futures::prelude::*; -use libp2p_identity::PeerId; +use libp2p::PeerId; use rand::{ distributions::{Distribution, Uniform, WeightedIndex}, seq::IteratorRandom, }; -use sc_peerset::{ - DropReason, IncomingIndex, Message, Peerset, PeersetConfig, ReputationChange, SetConfig, SetId, -}; -use std::{ - collections::{HashMap, HashSet}, - pin::Pin, - task::Poll, +use sc_network::{ + peer_store::{PeerStore, PeerStoreProvider}, + protocol_controller::{IncomingIndex, Message, ProtoSetConfig, ProtocolController, SetId}, + ReputationChange, }; +use sc_utils::mpsc::tracing_unbounded; +use std::collections::{HashMap, HashSet}; /// Peer events as observed by `Notifications` / fuzz test. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] @@ -83,16 +85,16 @@ fn discard_incoming_index(state: State) -> BareState { } } -#[test] -fn run() { +#[tokio::test] +async fn run() { sp_tracing::try_init_simple(); for _ in 0..50 { - test_once(); + test_once().await; } } -fn test_once() { +async fn test_once() { // Allowed events that can be received in a specific state. let allowed_events: HashMap> = [ ( @@ -129,15 +131,23 @@ fn test_once() { // Nodes that we have reserved. Always a subset of `known_nodes`. let mut reserved_nodes = HashSet::::new(); - let (mut peerset, peerset_handle) = Peerset::from_config(PeersetConfig { - sets: vec![SetConfig { - bootnodes: (0..Uniform::new_inclusive(0, 4).sample(&mut rng)) - .map(|_| { - let id = PeerId::random(); - known_nodes.insert(id, State::Disconnected); - id - }) - .collect(), + // Bootnodes for `PeerStore` initialization. + let bootnodes = (0..Uniform::new_inclusive(0, 4).sample(&mut rng)) + .map(|_| { + let id = PeerId::random(); + known_nodes.insert(id, State::Disconnected); + id + }) + .collect(); + + let peer_store = PeerStore::new(bootnodes); + let mut peer_store_handle = peer_store.handle(); + + let (to_notifications, mut from_controller) = + tracing_unbounded("test_to_notifications", 10_000); + let (protocol_handle, protocol_controller) = ProtocolController::new( + SetId::from(0), + ProtoSetConfig { reserved_nodes: { (0..Uniform::new_inclusive(0, 2).sample(&mut rng)) .map(|_| { @@ -151,22 +161,28 @@ fn test_once() { in_peers: Uniform::new_inclusive(0, 25).sample(&mut rng), out_peers: Uniform::new_inclusive(0, 25).sample(&mut rng), reserved_only: Uniform::new_inclusive(0, 10).sample(&mut rng) == 0, - }], - }); - - let new_id = PeerId::random(); - known_nodes.insert(new_id, State::Disconnected); - peerset_handle.add_known_peer(new_id); - - futures::executor::block_on(futures::future::poll_fn(move |cx| { - // List of nodes the user of `peerset` assumes it's connected to. Always a subset of - // `known_nodes`. - let mut connected_nodes = HashSet::::new(); - // List of nodes the user of `peerset` called `incoming` with and that haven't been - // accepted or rejected yet. - let mut incoming_nodes = HashMap::::new(); - // Next id for incoming connections. - let mut next_incoming_id = IncomingIndex(0); + }, + to_notifications, + Box::new(peer_store_handle.clone()), + ); + + tokio::spawn(peer_store.run()); + tokio::spawn(protocol_controller.run()); + + // List of nodes the user of `peerset` assumes it's connected to. Always a subset of + // `known_nodes`. + let mut connected_nodes = HashSet::::new(); + // List of nodes the user of `peerset` called `incoming` with and that haven't been + // accepted or rejected yet. + let mut incoming_nodes = HashMap::::new(); + // Next id for incoming connections. + let mut next_incoming_id = IncomingIndex(0); + + // The loop below is effectively synchronous, so for `PeerStore` & `ProtocolController` + // runners, spawned above, to advance, we use `spawn_blocking`. + let _ = tokio::task::spawn_blocking(move || { + // PRNG to use in `spawn_blocking` context. + let mut rng = rand::thread_rng(); // Perform a certain number of actions while checking that the state is consistent. If we // reach the end of the loop, the run has succeeded. @@ -175,17 +191,18 @@ fn test_once() { for _ in 0..2500 { // Peer we are working with. let mut current_peer = None; - // Current event for event bigrams validation. + // Current event for state transition validation. let mut current_event = None; // Last peer state for allowed event validation. let mut last_state = None; // Each of these weights corresponds to an action that we may perform. let action_weights = [150, 90, 90, 30, 30, 1, 1, 4, 4]; + match WeightedIndex::new(&action_weights).unwrap().sample(&mut rng) { - // If we generate 0, poll the peerset. - 0 => match Stream::poll_next(Pin::new(&mut peerset), cx) { - Poll::Ready(Some(Message::Connect { peer_id, .. })) => { + // If we generate 0, try to grab the next message from `ProtocolController`. + 0 => match from_controller.next().now_or_never() { + Some(Some(Message::Connect { peer_id, .. })) => { log::info!("PSM: connecting to peer {}", peer_id); let state = known_nodes.get_mut(&peer_id).unwrap(); @@ -210,7 +227,7 @@ fn test_once() { current_peer = Some(peer_id); current_event = Some(Event::PsmConnect); }, - Poll::Ready(Some(Message::Drop { peer_id, .. })) => { + Some(Some(Message::Drop { peer_id, .. })) => { log::info!("PSM: dropping peer {}", peer_id); let state = known_nodes.get_mut(&peer_id).unwrap(); @@ -232,7 +249,7 @@ fn test_once() { current_peer = Some(peer_id); current_event = Some(Event::PsmDrop); }, - Poll::Ready(Some(Message::Accept(n))) => { + Some(Some(Message::Accept(n))) => { log::info!("PSM: accepting index {}", n.0); let peer_id = incoming_nodes.remove(&n).unwrap(); @@ -263,7 +280,7 @@ fn test_once() { current_peer = Some(peer_id); current_event = Some(Event::PsmAccept); }, - Poll::Ready(Some(Message::Reject(n))) => { + Some(Some(Message::Reject(n))) => { log::info!("PSM: rejecting index {}", n.0); let peer_id = incoming_nodes.remove(&n).unwrap(); @@ -294,22 +311,22 @@ fn test_once() { current_peer = Some(peer_id); current_event = Some(Event::PsmReject); }, - Poll::Ready(None) => panic!(), - Poll::Pending => {}, + Some(None) => panic!(), + None => {}, }, // If we generate 1, discover a new node. 1 => { let new_id = PeerId::random(); known_nodes.insert(new_id, State::Disconnected); - peerset_handle.add_known_peer(new_id); + peer_store_handle.add_known_peer(new_id); }, // If we generate 2, adjust a random reputation. 2 => if let Some(id) = known_nodes.keys().choose(&mut rng) { let val = Uniform::new_inclusive(i32::MIN, i32::MAX).sample(&mut rng); - peerset_handle.report_peer(*id, ReputationChange::new(val, "")); + peer_store_handle.report_peer(*id, ReputationChange::new(val, "")); }, // If we generate 3, disconnect from a random node. @@ -322,7 +339,7 @@ fn test_once() { last_state = Some(*state); *state = State::Disconnected; - peerset.dropped(SetId::from(0), id, DropReason::Unknown); + protocol_handle.dropped(id); current_peer = Some(id); current_event = Some(Event::Disconnected); @@ -340,7 +357,7 @@ fn test_once() { .cloned() { log::info!("Incoming connection from {}, index {}", id, next_incoming_id.0); - peerset.incoming(SetId::from(0), id, next_incoming_id); + protocol_handle.incoming_connection(id, next_incoming_id); incoming_nodes.insert(next_incoming_id, id); let state = known_nodes.get_mut(&id).unwrap(); @@ -357,11 +374,11 @@ fn test_once() { // 5 and 6 are the reserved-only mode. 5 => { log::info!("Set reserved only"); - peerset_handle.set_reserved_only(SetId::from(0), true); + protocol_handle.set_reserved_only(true); }, 6 => { log::info!("Unset reserved only"); - peerset_handle.set_reserved_only(SetId::from(0), false); + protocol_handle.set_reserved_only(false); }, // 7 and 8 are about switching a random node in or out of reserved mode. @@ -370,7 +387,7 @@ fn test_once() { known_nodes.keys().filter(|n| !reserved_nodes.contains(*n)).choose(&mut rng) { log::info!("Add reserved: {}", id); - peerset_handle.add_reserved_peer(SetId::from(0), *id); + protocol_handle.add_reserved_peer(*id); reserved_nodes.insert(*id); } }, @@ -378,13 +395,13 @@ fn test_once() { if let Some(id) = reserved_nodes.iter().choose(&mut rng).cloned() { log::info!("Remove reserved: {}", id); reserved_nodes.remove(&id); - peerset_handle.remove_reserved_peer(SetId::from(0), id); + protocol_handle.remove_reserved_peer(id); }, _ => unreachable!(), } - // Validate event bigrams and state transitions. + // Validate state transitions. if let Some(peer_id) = current_peer { let event = current_event.unwrap(); let last_state = discard_incoming_index(last_state.unwrap()); @@ -396,7 +413,6 @@ fn test_once() { } } } - - Poll::Ready(()) - })); + }) + .await; } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index c41cdbd5a2c21..05ed3ddb79800 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -20,6 +20,8 @@ #[cfg(test)] mod block_import; #[cfg(test)] +mod fuzz; +#[cfg(test)] mod service; #[cfg(test)] mod sync; @@ -53,6 +55,7 @@ use sc_network::{ FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, Role, SyncMode, TransportConfig, }, + peer_store::PeerStore, request_responses::ProtocolConfig as RequestResponseConfig, types::ProtocolName, Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, @@ -915,6 +918,12 @@ where }); } + let peer_store = PeerStore::new( + network_config.boot_nodes.iter().map(|bootnode| bootnode.peer_id).collect(), + ); + let peer_store_handle = peer_store.handle(); + self.spawn_task(peer_store.run().boxed()); + let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); let network = NetworkWorker::new(sc_network::config::Params { @@ -923,6 +932,7 @@ where tokio::spawn(f); }), network_config: full_net_config, + peer_store: peer_store_handle, genesis_hash, protocol_id, fork_id, diff --git a/client/network/test/src/service.rs b/client/network/test/src/service.rs index 8c15d6b09ea45..e2a9cb5f3bafd 100644 --- a/client/network/test/src/service.rs +++ b/client/network/test/src/service.rs @@ -23,6 +23,7 @@ use sc_consensus::{ImportQueue, Link}; use sc_network::{ config::{self, FullNetworkConfiguration, MultiaddrWithPeerId, ProtocolId, TransportConfig}, event::Event, + peer_store::PeerStore, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkService, NetworkStateInfo, NetworkWorker, }; @@ -220,6 +221,12 @@ impl TestNetworkBuilder { full_net_config.add_request_response_protocol(config); } + let peer_store = PeerStore::new( + network_config.boot_nodes.iter().map(|bootnode| bootnode.peer_id).collect(), + ); + let peer_store_handle = peer_store.handle(); + tokio::spawn(peer_store.run().boxed()); + let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); let worker = NetworkWorker::< @@ -233,6 +240,7 @@ impl TestNetworkBuilder { }), genesis_hash, network_config: full_net_config, + peer_store: peer_store_handle, protocol_id, fork_id, metrics_registry: None, diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 7c6e341c30fa5..389177b4aaf1b 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -550,7 +550,10 @@ async fn can_sync_explicit_forks() { .await; } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO: for unknown reason, this test is flaky on a multithreaded runtime, so we run it +// in a single-threaded mode. +// See issue https://github.com/paritytech/substrate/issues/14622. +#[tokio::test] async fn syncs_header_only_forks() { sp_tracing::try_init_simple(); let mut net = TestNet::new(0); diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index 5d86e3c35d8ab..7711eaed838a2 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -338,10 +338,13 @@ where } }, SyncEvent::PeerDisconnected(remote) => { - self.network.remove_peers_from_reserved_set( + let result = self.network.remove_peers_from_reserved_set( self.protocol_name.clone(), iter::once(remote).collect(), ); + if let Err(err) = result { + log::error!(target: "sync", "Remove reserved peer failed: {}", err); + } }, } } diff --git a/client/offchain/src/api.rs b/client/offchain/src/api.rs index e6b0e30f20378..c7df5784d329e 100644 --- a/client/offchain/src/api.rs +++ b/client/offchain/src/api.rs @@ -283,7 +283,11 @@ mod tests { unimplemented!(); } - fn remove_peers_from_reserved_set(&self, _protocol: ProtocolName, _peers: Vec) { + fn remove_peers_from_reserved_set( + &self, + _protocol: ProtocolName, + _peers: Vec, + ) -> Result<(), String> { unimplemented!(); } diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index 4c11a5cb7294d..a11ac7d86ecb8 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -411,7 +411,11 @@ mod tests { unimplemented!(); } - fn remove_peers_from_reserved_set(&self, _protocol: ProtocolName, _peers: Vec) { + fn remove_peers_from_reserved_set( + &self, + _protocol: ProtocolName, + _peers: Vec, + ) -> Result<(), String> { unimplemented!(); } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 7dbdda6317534..d4cc575afec89 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -43,6 +43,7 @@ use sc_executor::{ use sc_keystore::LocalKeystore; use sc_network::{ config::{FullNetworkConfiguration, SyncMode}, + peer_store::PeerStore, NetworkService, NetworkStateInfo, NetworkStatusProvider, }; use sc_network_bitswap::BitswapRequestHandler; @@ -860,6 +861,18 @@ where ); net_config.add_notification_protocol(transactions_handler_proto.set_config()); + // Create `PeerStore` and initialize it with bootnode peer ids. + let peer_store = PeerStore::new( + net_config + .network_config + .boot_nodes + .iter() + .map(|bootnode| bootnode.peer_id) + .collect(), + ); + let peer_store_handle = peer_store.handle(); + spawn_handle.spawn("peer-store", Some("networking"), peer_store.run()); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( @@ -891,6 +904,7 @@ where }) }, network_config: net_config, + peer_store: peer_store_handle, genesis_hash, protocol_id: protocol_id.clone(), fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),