From e1d687b03d8d97014c681b0f57cc9afec282b52d Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 25 Feb 2020 15:53:33 +0100 Subject: [PATCH] Adapt to rust-libp2p#1440. --- Cargo.lock | 82 +-- Cargo.toml | 6 + client/network/src/debug_info.rs | 31 +- client/network/src/discovery.rs | 23 +- client/network/src/protocol.rs | 15 +- client/network/src/protocol/block_requests.rs | 8 +- .../src/protocol/generic_proto/behaviour.rs | 659 +++++++++++------- .../protocol/generic_proto/handler/group.rs | 34 +- .../protocol/generic_proto/handler/legacy.rs | 37 +- .../generic_proto/handler/notif_in.rs | 2 +- .../generic_proto/handler/notif_out.rs | 22 +- .../src/protocol/generic_proto/tests.rs | 11 +- .../src/protocol/light_client_handler.rs | 39 +- 13 files changed, 559 insertions(+), 410 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67346ad86bcb1..7c2f3f3518322 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2630,8 +2630,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bba17ee9cac4bb89de5812159877d9b4f0a993bf41697a5a875940cd1eb71f24" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "bytes 0.5.4", "futures 0.3.4", @@ -2657,7 +2656,7 @@ dependencies = [ "libp2p-wasm-ext", "libp2p-websocket", "libp2p-yamux", - "parity-multiaddr 0.7.2", + "parity-multiaddr 0.7.3", "parity-multihash 0.2.3", "parking_lot 0.10.0", "pin-project", @@ -2668,12 +2667,12 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b874594c4b29de1a29f27871feba8e6cd13aa54a8a1e8f8c7cf3dfac5ca287c" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "asn1_der", "bs58 0.3.0", "ed25519-dalek", + "either", "fnv", "futures 0.3.4", "futures-timer 3.0.2", @@ -2681,7 +2680,7 @@ dependencies = [ "libsecp256k1", "log 0.4.8", "multistream-select", - "parity-multiaddr 0.7.2", + "parity-multiaddr 0.7.3", "parity-multihash 0.2.3", "parking_lot 0.10.0", "pin-project", @@ -2701,8 +2700,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d472e9d522f588805c77801de10b957be84e10f019ca5f869fa1825b15ea9b" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "quote", "syn", @@ -2711,8 +2709,7 @@ dependencies = [ [[package]] name = "libp2p-deflate" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e25004d4d9837b44b22c5f1a69be1724a5168fef6cff1716b5176a972c3aa62" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "flate2", "futures 0.3.4", @@ -2722,8 +2719,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b99e552f9939b606eb4b59f7f64d9b01e3f96752f47e350fc3c5fc646ed3f649" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2733,8 +2729,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3234f12e44f9a50351a9807b97fe7de11eb9ae4482370392ba10da6dc90722" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "cuckoofilter", "fnv", @@ -2750,8 +2745,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d46cb3e0841bd951cbf4feae56cdc081e6347836a644fb260c3ec554149b4006" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "base64 0.11.0", "byteorder 1.3.4", @@ -2775,8 +2769,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfeb935a9bd41263e4f3a24b988e9f4a044f3ae89ac284e83c17fe2f84e0d66b" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2791,8 +2784,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "464dc8412978d40f0286be72ed9ab5e0e1386a4a06e7f174526739b5c3c1f041" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "arrayvec 0.5.1", "bytes 0.5.4", @@ -2818,8 +2810,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "881fcfb360c2822db9f0e6bb6f89529621556ed9a8b038313414eda5107334de" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "async-std", "data-encoding", @@ -2840,8 +2831,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8507b37ad0eed275efcde67a023c3d85af6c80768b193845b9288e848e1af95" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "bytes 0.5.4", "fnv", @@ -2856,8 +2846,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15a8a3d71f898beb6f854c8aae27aa1d198e0d1f2e49412261c2d90ef39675a" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "curve25519-dalek 2.0.0", "futures 0.3.4", @@ -2877,8 +2866,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d22f2f228b3a828dca1cb8aa9fa331e0bc9c36510cb2c1916956e20dc85e8c" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2892,8 +2880,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56126a204d7b3382bac163143ff4125a14570b3ba76ba979103d1ae1abed1923" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "bytes 0.5.4", "futures 0.3.4", @@ -2910,8 +2897,7 @@ dependencies = [ [[package]] name = "libp2p-pnet" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b916938a8868f75180aeeffcc6a516a922d165e8fa2a90b57bad989d1ccbb57a" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "log 0.4.8", @@ -2924,8 +2910,7 @@ dependencies = [ [[package]] name = "libp2p-secio" version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1219e9ecb4945d7331a05f5ffe96a1f6e28051bfa1223d4c60353c251de0354e" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "aes-ctr", "ctr", @@ -2954,8 +2939,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "275471e7c0e88ae004660866cd54f603bd8bd1f4caef541a27f50dd8640c4d4c" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2968,8 +2952,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e80ad4e3535345f3d666554ce347d3100453775611c05c60786bf9a1747a10" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "async-std", "futures 0.3.4", @@ -2983,8 +2966,7 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d329564a43da9d0e055a5b938633c4a8ceab1f59cec133fbc4647917c07341" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "async-std", "futures 0.3.4", @@ -2995,8 +2977,7 @@ dependencies = [ [[package]] name = "libp2p-wasm-ext" version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "923581c055bc4b8c5f42d4ce5ef43e52fe5216f1ea4bc26476cb8a966ce6220b" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "js-sys", @@ -3009,8 +2990,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5351ca9eea122081c1c0f9323164d2918cac29b5a6bfe5054d4ba8ec9447cf42" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "async-tls", "bytes 0.5.4", @@ -3030,8 +3010,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dac30de24ccde0e67f363d71a125c587bbe6589503f664947e9b084b68a34f1" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -3284,8 +3263,7 @@ checksum = "a97fbd5d00e0e37bfb10f433af8f5aaf631e739368dc9fc28286ca81ca4948dc" [[package]] name = "multistream-select" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f938ffe420493e77c8b6cbcc3f282283f68fc889c5dcbc8e51668d5f3a01ad94" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "bytes 0.5.4", "futures 0.1.29", @@ -4569,9 +4547,8 @@ dependencies = [ [[package]] name = "parity-multiaddr" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26df883298bc3f4e92528b4c5cc9f806b791955b136da3e5e939ed9de0fd958b" +version = "0.7.3" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "arrayref", "bs58 0.3.0", @@ -4603,8 +4580,7 @@ dependencies = [ [[package]] name = "parity-multihash" version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a1cd2ba02391b81367bec529fb209019d718684fdc8ad6a712c2b536e46f775" +source = "git+https://github.com/romanb/rust-libp2p?branch=multicon#86cf85b9a53f2895819ee45f55e8adcdbbe7a72f" dependencies = [ "blake2", "bytes 0.5.4", diff --git a/Cargo.toml b/Cargo.toml index 1ac2beb052fac..bf95359327246 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,3 +166,9 @@ members = [ [profile.release] # Substrate runtime requires unwinding. panic = "unwind" + +[patch.crates-io] +# libp2p = { path = "../rust-libp2p" } +libp2p = { git = "https://github.com/romanb/rust-libp2p", branch = "multicon" } + + diff --git a/client/network/src/debug_info.rs b/client/network/src/debug_info.rs index 17fb622f7cd3c..7829e56d8fe6a 100644 --- a/client/network/src/debug_info.rs +++ b/client/network/src/debug_info.rs @@ -17,7 +17,7 @@ use fnv::FnvHashMap; use futures::prelude::*; use libp2p::Multiaddr; -use libp2p::core::nodes::listeners::ListenerId; +use libp2p::core::connection::{ConnectionId, ListenerId}; use libp2p::core::{ConnectedPoint, either::EitherOutput, PeerId, PublicKey}; use libp2p::swarm::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; @@ -205,26 +205,15 @@ impl NetworkBehaviour for DebugInfoBehaviour { } } - fn inject_node_event( + fn inject_event( &mut self, peer_id: PeerId, + connection: ConnectionId, event: <::Handler as ProtocolsHandler>::OutEvent ) { match event { - EitherOutput::First(event) => self.ping.inject_node_event(peer_id, event), - EitherOutput::Second(event) => self.identify.inject_node_event(peer_id, event), - } - } - - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.ping.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone()); - self.identify.inject_replaced(peer_id.clone(), closed_endpoint, new_endpoint.clone()); - - if let Some(entry) = self.nodes_info.get_mut(&peer_id) { - entry.endpoint = new_endpoint; - } else { - error!(target: "sub-libp2p", - "Disconnected from node we were not connected to {:?}", peer_id); + EitherOutput::First(event) => self.ping.inject_event(peer_id, connection, event), + EitherOutput::Second(event) => self.identify.inject_event(peer_id, connection, event), } } @@ -285,9 +274,10 @@ impl NetworkBehaviour for DebugInfoBehaviour { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - return Poll::Ready(NetworkBehaviourAction::SendEvent { + Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, + handler, event: EitherOutput::First(event) }), Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => @@ -314,9 +304,10 @@ impl NetworkBehaviour for DebugInfoBehaviour { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - return Poll::Ready(NetworkBehaviourAction::SendEvent { + Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, + handler, event: EitherOutput::Second(event) }), Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index 8360fce518607..995e1b690a583 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -47,7 +47,7 @@ use futures::prelude::*; use futures_timer::Delay; -use libp2p::core::{nodes::listeners::ListenerId, ConnectedPoint, Multiaddr, PeerId, PublicKey}; +use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p::swarm::{ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record}; use libp2p::kad::GetClosestPeersError; @@ -149,6 +149,7 @@ impl DiscoveryBehaviour { /// If we didn't know this address before, also generates a `Discovered` event. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { + self.kademlia.add_address(&peer_id, addr.clone()); self.discoveries.push_back(peer_id.clone()); self.user_defined.push((peer_id, addr)); } @@ -261,10 +262,6 @@ impl NetworkBehaviour for DiscoveryBehaviour { NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint) } - fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) { - NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened) - } - fn inject_addr_reach_failure( &mut self, peer_id: Option<&PeerId>, @@ -274,12 +271,13 @@ impl NetworkBehaviour for DiscoveryBehaviour { NetworkBehaviour::inject_addr_reach_failure(&mut self.kademlia, peer_id, addr, error) } - fn inject_node_event( + fn inject_event( &mut self, peer_id: PeerId, + connection: ConnectionId, event: ::OutEvent, ) { - NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event) + NetworkBehaviour::inject_event(&mut self.kademlia, peer_id, connection, event) } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { @@ -332,8 +330,9 @@ impl NetworkBehaviour for DiscoveryBehaviour { while let Poll::Ready(_) = self.next_kad_random_query.poll_unpin(cx) { if self.num_connections < self.discovery_only_if_under_num { let random_peer_id = PeerId::random(); - debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \ - {:?}", random_peer_id); + debug!(target: "sub-libp2p", + "Libp2p <= Starting random Kademlia request for {:?}", + random_peer_id); self.kademlia.get_closest_peers(random_peer_id); } else { @@ -437,8 +436,8 @@ impl NetworkBehaviour for DiscoveryBehaviour { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), NetworkBehaviourAction::DialPeer { peer_id } => return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - NetworkBehaviourAction::SendEvent { peer_id, event } => - return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }), NetworkBehaviourAction::ReportObservedAddr { address } => return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), } @@ -468,7 +467,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), NetworkBehaviourAction::DialPeer { peer_id } => return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - NetworkBehaviourAction::SendEvent { event, .. } => + NetworkBehaviourAction::NotifyHandler { event, .. } => match event {}, // `event` is an enum with no variant NetworkBehaviourAction::ReportObservedAddr { address } => return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index d344321e68dd0..f3f060ffed1d0 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -20,7 +20,7 @@ use bytes::{Bytes, BytesMut}; use futures::prelude::*; use generic_proto::{GenericProto, GenericProtoOut}; use libp2p::{Multiaddr, PeerId}; -use libp2p::core::{ConnectedPoint, nodes::listeners::ListenerId}; +use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}}; use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use sp_core::storage::{StorageKey, ChildInfo}; @@ -1835,12 +1835,13 @@ impl NetworkBehaviour for Protocol { self.behaviour.inject_disconnected(peer_id, endpoint) } - fn inject_node_event( + fn inject_event( &mut self, peer_id: PeerId, + connection: ConnectionId, event: <::Handler as ProtocolsHandler>::OutEvent, ) { - self.behaviour.inject_node_event(peer_id, event) + self.behaviour.inject_event(peer_id, connection, event) } fn poll( @@ -1895,8 +1896,8 @@ impl NetworkBehaviour for Protocol { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }), Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), }; @@ -1934,10 +1935,6 @@ impl NetworkBehaviour for Protocol { } } - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint) - } - fn inject_addr_reach_failure( &mut self, peer_id: Option<&PeerId>, diff --git a/client/network/src/protocol/block_requests.rs b/client/network/src/protocol/block_requests.rs index 20a99378b16c9..ecb5f65d275d9 100644 --- a/client/network/src/protocol/block_requests.rs +++ b/client/network/src/protocol/block_requests.rs @@ -35,6 +35,7 @@ use libp2p::{ ConnectedPoint, Multiaddr, PeerId, + connection::ConnectionId, upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, upgrade::{DeniedUpgrade, read_one, write_one} }, @@ -270,7 +271,12 @@ where fn inject_disconnected(&mut self, _peer: &PeerId, _info: ConnectedPoint) { } - fn inject_node_event(&mut self, peer: PeerId, Request(request, mut stream): Request) { + fn inject_event( + &mut self, + peer: PeerId, + connection: ConnectionId, + Request(request, mut stream): Request + ) { match self.on_block_request(&peer, &request) { Ok(res) => { log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len()); diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 24e96681a084a..d459491ca4879 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -23,8 +23,8 @@ use bytes::BytesMut; use codec::Encode as _; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; -use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters}; use log::{debug, error, trace, warn}; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; @@ -34,15 +34,16 @@ use std::{error, mem, pin::Pin, str, time::Duration}; use std::task::{Context, Poll}; use wasm_timer::Instant; -/// Network behaviour that handles opening substreams for custom protocols with other nodes. +/// Network behaviour that handles opening substreams for custom protocols with other peers. /// /// ## Legacy vs new protocol /// /// The `GenericProto` behaves as following: /// -/// - Whenever a connection is established, we open a single substream (called "legay protocol" in -/// the source code). This substream name depends on the `protocol_id` and `versions` passed at -/// initialization. If the remote refuses this substream, we close the connection. +/// - Whenever a connection is established, we open a single substream (called "legacy protocol" in +/// the source code) on that connection. This substream name depends on the `protocol_id` and +/// `versions` passed at initialization. If the remote refuses this substream, we close the +/// connection. /// /// - For each registered protocol, we also open an additional substream for this protocol. If the /// remote refuses this substream, then it's fine. @@ -57,27 +58,51 @@ use wasm_timer::Instant; /// /// - The libp2p swarm that opens new connections and reports disconnects. /// - The connection handler (see `handler.rs`) that handles individual connections. -/// - The peerset manager (PSM) that requests links to nodes to be established or broken. +/// - The peerset manager (PSM) that requests links to peers to be established or broken. /// - The external API, that requires knowledge of the links that have been established. /// /// Each connection handler can be in four different states: Enabled+Open, Enabled+Closed, /// Disabled+Open, or Disabled+Closed. The Enabled/Disabled component must be in sync with the /// peerset manager. For example, if the peerset manager requires a disconnection, we disable the -/// existing handler. The Open/Closed component must be in sync with the external API. +/// connection handlers of that peer. The Open/Closed component must be in sync with the external +/// API. /// -/// However a connection handler only exists if we are actually connected to a node. What this -/// means is that there are six possible states for each node: Disconnected, Dialing (trying to -/// reach it), Enabled+Open, Enabled+Closed, Disabled+open, Disabled+Closed. Most notably, the -/// Dialing state must correspond to a "link established" state in the peerset manager. In other -/// words, the peerset manager doesn't differentiate whether we are dialing a node or connected -/// to it. +/// However, a connection handler for a peer only exists if we are actually connected to that peer. +/// What this means is that there are six possible states for each peer: Disconnected, Dialing +/// (trying to connect), Enabled+Open, Enabled+Closed, Disabled+Open, Disabled+Closed. +/// Most notably, the Dialing state must correspond to a "link established" state in the peerset +/// manager. In other words, the peerset manager doesn't differentiate whether we are dialing a +/// peer or connected to it. /// -/// Additionally, there also exists a "banning" system. If we fail to dial a node, we "ban" it for -/// a few seconds. If the PSM requests a node that is in the "banned" state, then we delay the -/// actual dialing attempt until after the ban expires, but the PSM will still consider the link -/// to be established. -/// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to -/// us, we accept the connection. The "banning" system is only about delaying dialing attempts. +/// There may be multiple connections to a peer. However, the status of a peer on +/// the API of this behaviour and towards the peerset manager is aggregated in +/// the following way: +/// +/// 1. The enabled/disabled status is the same across all connections, as +/// decided by the peerset manager. +/// 2. `send_packet` and `write_notification` always send all data over +/// the same connection to preserve the ordering provided by the transport, +/// as long as that connection is open. If it closes, a second open +/// connection may take over, if one exists, but that case should be no +/// different than a single connection failing and being re-established +/// in terms of potential reordering and dropped messages. Messages can +/// be received on any connection. +/// 3. The behaviour reports `GenericProtoOut::CustomProtocolOpen` when the +/// first connection reports `NotifsHandlerOut::Open`. +/// 4. The behaviour reports `GenericProtoOut::CustomProtocolClosed` when the +/// last connection reports `NotifsHandlerOut::Closed`. +/// +/// In this way, the number of actual established connections to the peer is +/// an implementation detail of this behaviour. Note that, in practice and at +/// the time of this writing, there may be at most two connections to a peer +/// and only as a result of simultaneous dialing. However, the implementation +/// accommodates for any number of connections. +/// +/// Additionally, there also exists a "banning" system. If we fail to dial a peer, we "ban" it for +/// a few seconds. If the PSM requests connecting to a peer that is currently "banned", the next +/// dialing attempt is delayed until after the ban expires. However, the PSM will still consider +/// the peer to be connected. This "ban" is thus not a ban in a strict sense: If a "banned" peer +/// tries to connect, the connection is accepted. A ban only delays dialing attempts. /// pub struct GenericProto { /// Legacy protocol to open with peers. Never modified. @@ -112,14 +137,14 @@ enum PeerState { /// the state machine code. Poisoned, - /// The peer misbehaved. If the PSM wants us to connect to this node, we will add an artificial + /// The peer misbehaved. If the PSM wants us to connect to this peer, we will add an artificial /// delay to the connection. Banned { - /// Until when the node is banned. + /// Until when the peer is banned. until: Instant, }, - /// The peerset requested that we connect to this peer. We are not connected to this node. + /// The peerset requested that we connect to this peer. We are currently not connected. PendingRequest { /// When to actually start dialing. timer: futures_timer::Delay, @@ -130,16 +155,13 @@ enum PeerState { /// The peerset requested that we connect to this peer. We are currently dialing this peer. Requested, - /// We are connected to this peer but the peerset refused it. This peer can still perform - /// Kademlia queries and such, but should get disconnected in a few seconds. + /// We are connected to this peer but the peerset refused it. + /// + /// We may still have ongoing traffic with that peer, but it should cease shortly. Disabled { - /// How we are connected to this peer. - connected_point: ConnectedPoint, - /// If true, we still have a custom protocol open with it. It will likely get closed in - /// a short amount of time, but we need to keep the information in order to not have a - /// state mismatch. - open: bool, - /// If `Some`, the node is banned until the given `Instant`. + /// The connections that are currently open for custom protocol traffic. + open: SmallVec<[ConnectionId; 2]>, + /// If `Some`, any dial attempts to this peer are delayed until the given `Instant`. banned_until: Option, }, @@ -147,12 +169,8 @@ enum PeerState { /// will be enabled when `timer` fires. This peer can still perform Kademlia queries and such, /// but should get disconnected in a few seconds. DisabledPendingEnable { - /// How we are connected to this peer. - connected_point: ConnectedPoint, - /// If true, we still have a custom protocol open with it. It will likely get closed in - /// a short amount of time, but we need to keep the information in order to not have a - /// state mismatch. - open: bool, + /// The connections that are currently open for custom protocol traffic. + open: SmallVec<[ConnectionId; 2]>, /// When to enable this remote. timer: futures_timer::Delay, /// When the `timer` will trigger. @@ -162,33 +180,37 @@ enum PeerState { /// We are connected to this peer and the peerset has accepted it. The handler is in the /// enabled state. Enabled { - /// How we are connected to this peer. - connected_point: ConnectedPoint, - /// If true, we have a custom protocol open with this peer. - open: bool, + /// The connections that are currently open for custom protocol traffic. + open: SmallVec<[ConnectionId; 2]>, }, - /// We are connected to this peer, and we sent an incoming message to the peerset. The handler - /// is in initialization mode. We are waiting for the Accept or Reject from the peerset. There - /// is a corresponding entry in `incoming`. - Incoming { - /// How we are connected to this peer. - connected_point: ConnectedPoint, - }, + /// We received an incoming connection from this peer and forwarded that + /// connection request to the peerset. The connection handlers are waiting + /// for initialisation, i.e. to be enabled or disabled based on whether + /// the peerset accepts or rejects the peer. + Incoming, } impl PeerState { - /// True if we have an open channel with that node. + /// True if there exists an established connection to tbe peer + /// that is open for custom protocol traffic. fn is_open(&self) -> bool { + self.get_open().is_some() + } + + /// Returns the connection ID of the first established connection + /// that is open for custom protocol traffic. + fn get_open(&self) -> Option { match self { - PeerState::Poisoned => false, - PeerState::Banned { .. } => false, - PeerState::PendingRequest { .. } => false, - PeerState::Requested => false, - PeerState::Disabled { open, .. } => *open, - PeerState::DisabledPendingEnable { open, .. } => *open, - PeerState::Enabled { open, .. } => *open, - PeerState::Incoming { .. } => false, + PeerState::Disabled { open, .. } | + PeerState::DisabledPendingEnable { open, .. } | + PeerState::Enabled { open, .. } => + if !open.is_empty() { + Some(open[0]) + } else { + None + } + _ => None } } } @@ -196,7 +218,7 @@ impl PeerState { /// State of an "incoming" message sent to the peer set manager. #[derive(Debug)] struct IncomingPeer { - /// Id of the node that is concerned. + /// Id of the remote peer of the incoming connection. peer_id: PeerId, /// If true, this "incoming" still corresponds to an actual connection. If false, then the /// connection corresponding to it has been closed or replaced already. @@ -210,10 +232,8 @@ struct IncomingPeer { pub enum GenericProtoOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { - /// Id of the node we have opened a connection with. + /// Id of the peer we are connected to. peer_id: PeerId, - /// Endpoint used for this custom protocol. - endpoint: ConnectedPoint, }, /// Closed a custom protocol with the remote. @@ -282,7 +302,7 @@ impl GenericProto { self.peers.iter().filter(|(_, state)| state.is_open()).map(|(id, _)| id) } - /// Returns true if we have a channel open with this node. + /// Returns true if we have an open connection to the given peer. pub fn is_open(&self, peer_id: &PeerId) -> bool { self.peers.get(peer_id).map(|p| p.is_open()).unwrap_or(false) } @@ -293,8 +313,8 @@ impl GenericProto { self.disconnect_peer_inner(peer_id, None); } - /// Inner implementation of `disconnect_peer`. If `ban` is `Some`, we ban the node for the - /// specific duration. + /// Inner implementation of `disconnect_peer`. If `ban` is `Some`, we ban the peer + /// for the specific duration. fn disconnect_peer_inner(&mut self, peer_id: &PeerId, ban: Option) { let mut entry = if let Entry::Occupied(entry) = self.peers.entry(peer_id.clone()) { entry @@ -310,7 +330,11 @@ impl GenericProto { st @ PeerState::Banned { .. } => *entry.into_mut() = st, // DisabledPendingEnable => Disabled. - PeerState::DisabledPendingEnable { open, connected_point, timer_deadline, .. } => { + PeerState::DisabledPendingEnable { + open, + timer_deadline, + timer: _ + } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); let banned_until = Some(if let Some(ban) = ban { @@ -318,24 +342,31 @@ impl GenericProto { } else { timer_deadline }); - *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until } + *entry.into_mut() = PeerState::Disabled { + open, + banned_until + } }, // Enabled => Disabled. - PeerState::Enabled { open, connected_point } => { + PeerState::Enabled { open } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), + handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, }); let banned_until = ban.map(|dur| Instant::now() + dur); - *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until } + *entry.into_mut() = PeerState::Disabled { + open, + banned_until + } }, // Incoming => Disabled. - PeerState::Incoming { connected_point, .. } => { + PeerState::Incoming => { let inc = if let Some(inc) = self.incoming.iter_mut() .find(|i| i.peer_id == *entry.key() && i.alive) { inc @@ -347,12 +378,16 @@ impl GenericProto { inc.alive = false; debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), + handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, }); let banned_until = ban.map(|dur| Instant::now() + dur); - *entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until } + *entry.into_mut() = PeerState::Disabled { + open: SmallVec::new(), + banned_until + } }, PeerState::Poisoned => @@ -391,9 +426,15 @@ impl GenericProto { protocol_name: Cow<'static, [u8]>, message: impl Into>, ) { - if !self.is_open(target) { - return; - } + let conn = match self.peers.get(target).and_then(|p| p.get_open()) { + None => { + debug!(target: "sub-libp2p", + "Tried to sent notification to {:?} without an open channel.", + target); + return + }, + Some(conn) => conn + }; trace!( target: "sub-libp2p", @@ -403,8 +444,9 @@ impl GenericProto { ); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: target.clone(), + handler: NotifyHandler::One(conn), event: NotifsHandlerIn::SendNotification { message: message.into(), engine_id, @@ -420,14 +462,21 @@ impl GenericProto { /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. pub fn send_packet(&mut self, target: &PeerId, message: Vec) { - if !self.is_open(target) { - return; - } + let conn = match self.peers.get(target).and_then(|p| p.get_open()) { + None => { + debug!(target: "sub-libp2p", + "Tried to sent packet to {:?} without an open channel.", + target); + return + } + Some(conn) => conn + }; trace!(target: "sub-libp2p", "External API => Packet for {:?}", target); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: target.clone(), + handler: NotifyHandler::One(conn), event: NotifsHandlerIn::SendLegacy { message, } @@ -439,7 +488,7 @@ impl GenericProto { self.peerset.debug_info() } - /// Function that is called when the peerset wants us to connect to a node. + /// Function that is called when the peerset wants us to connect to a peer. fn peerset_report_connect(&mut self, peer_id: PeerId) { let mut occ_entry = match self.peers.entry(peer_id) { Entry::Occupied(entry) => entry, @@ -472,32 +521,34 @@ impl GenericProto { *occ_entry.into_mut() = PeerState::Requested; }, - PeerState::Disabled { open, ref connected_point, banned_until: Some(ref banned) } - if *banned > now => { - debug!(target: "sub-libp2p", "PSM => Connect({:?}): Has idle connection through \ - {:?} but node is banned until {:?}", occ_entry.key(), connected_point, banned); + PeerState::Disabled { + open, + banned_until: Some(ref banned) + } if *banned > now => { + debug!(target: "sub-libp2p", "PSM => Connect({:?}): But peer is banned until {:?}", + occ_entry.key(), banned); *occ_entry.into_mut() = PeerState::DisabledPendingEnable { - connected_point: connected_point.clone(), open, timer: futures_timer::Delay::new(banned.clone() - now), timer_deadline: banned.clone(), }; }, - PeerState::Disabled { open, connected_point, banned_until: _ } => { - debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling previously-idle \ - connection through {:?}", occ_entry.key(), connected_point); + PeerState::Disabled { open, banned_until: _ } => { + debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling connections.", + occ_entry.key()); debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), + handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); - *occ_entry.into_mut() = PeerState::Enabled { connected_point, open }; + *occ_entry.into_mut() = PeerState::Enabled { open }; }, - PeerState::Incoming { connected_point, .. } => { - debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling incoming \ - connection through {:?}", occ_entry.key(), connected_point); + PeerState::Incoming => { + debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling connections.", + occ_entry.key()); if let Some(inc) = self.incoming.iter_mut() .find(|i| i.peer_id == *occ_entry.key() && i.alive) { inc.alive = false; @@ -506,26 +557,30 @@ impl GenericProto { incoming for incoming peer") } debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), + handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); - *occ_entry.into_mut() = PeerState::Enabled { connected_point, open: false }; + *occ_entry.into_mut() = PeerState::Enabled { open: SmallVec::new() }; }, st @ PeerState::Enabled { .. } => { - warn!(target: "sub-libp2p", "PSM => Connect({:?}): Already connected to this \ - peer", occ_entry.key()); + warn!(target: "sub-libp2p", + "PSM => Connect({:?}): Already connected.", + occ_entry.key()); *occ_entry.into_mut() = st; }, st @ PeerState::DisabledPendingEnable { .. } => { - warn!(target: "sub-libp2p", "PSM => Connect({:?}): Already have an idle \ - connection to this peer and waiting to enable it", occ_entry.key()); + warn!(target: "sub-libp2p", + "PSM => Connect({:?}): Already pending enabling.", + occ_entry.key()); *occ_entry.into_mut() = st; }, st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => { - warn!(target: "sub-libp2p", "PSM => Connect({:?}): Received a previous \ - request for that peer", occ_entry.key()); + warn!(target: "sub-libp2p", + "PSM => Connect({:?}): Duplicate request.", + occ_entry.key()); *occ_entry.into_mut() = st; }, @@ -534,55 +589,63 @@ impl GenericProto { } } - /// Function that is called when the peerset wants us to disconnect from a node. + /// Function that is called when the peerset wants us to disconnect from a peer. fn peerset_report_disconnect(&mut self, peer_id: PeerId) { let mut entry = match self.peers.entry(peer_id) { Entry::Occupied(entry) => entry, Entry::Vacant(entry) => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Node already disabled", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Already disabled.", entry.key()); return } }; match mem::replace(entry.get_mut(), PeerState::Poisoned) { st @ PeerState::Disabled { .. } | st @ PeerState::Banned { .. } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Node already disabled", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Already disabled.", entry.key()); *entry.into_mut() = st; }, - PeerState::DisabledPendingEnable { open, connected_point, timer_deadline, .. } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Interrupting pending \ - enable", entry.key()); + PeerState::DisabledPendingEnable { + open, + timer_deadline, + timer: _ + } => { + debug!(target: "sub-libp2p", + "PSM => Drop({:?}): Interrupting pending enabling.", + entry.key()); *entry.into_mut() = PeerState::Disabled { open, - connected_point, banned_until: Some(timer_deadline), }; }, - PeerState::Enabled { open, connected_point } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Disabling connection", entry.key()); + PeerState::Enabled { open } => { + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Disabling connections.", entry.key()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", entry.key()); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: entry.key().clone(), + handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, }); - *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until: None } + *entry.into_mut() = PeerState::Disabled { + open, + banned_until: None + } }, - st @ PeerState::Incoming { .. } => { - error!(target: "sub-libp2p", "PSM => Drop({:?}): Was in incoming mode", + st @ PeerState::Incoming => { + error!(target: "sub-libp2p", "PSM => Drop({:?}): Not enabled (Incoming).", entry.key()); *entry.into_mut() = st; }, PeerState::Requested => { // We don't cancel dialing. Libp2p doesn't expose that on purpose, as other - // sub-systems (such as the discovery mechanism) may require dialing this node as + // sub-systems (such as the discovery mechanism) may require dialing this peer as // well at the same time. - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Not yet connected.", entry.key()); entry.remove(); }, PeerState::PendingRequest { timer_deadline, .. } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Not yet connected", entry.key()); *entry.into_mut() = PeerState::Banned { until: timer_deadline } }, @@ -591,7 +654,8 @@ impl GenericProto { } } - /// Function that is called when the peerset wants us to accept an incoming node. + /// Function that is called when the peerset wants us to accept a connection + /// request from a peer. fn peerset_report_accept(&mut self, index: sc_peerset::IncomingIndex) { let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) { self.incoming.remove(pos) @@ -608,34 +672,25 @@ impl GenericProto { return } - let state = if let Some(state) = self.peers.get_mut(&incoming.peer_id) { - state - } else { - error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in peers \ - corresponding to an alive incoming"); - return - }; - - let connected_point = if let PeerState::Incoming { connected_point } = state { - connected_point.clone() - } else { - error!(target: "sub-libp2p", "State mismatch in libp2p: entry in peers corresponding \ - to an alive incoming is not in incoming state"); - return - }; - - debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Enabling connection \ - through {:?}", index, incoming.peer_id, connected_point); - debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", incoming.peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { - peer_id: incoming.peer_id, - event: NotifsHandlerIn::Enable, - }); - - *state = PeerState::Enabled { open: false, connected_point }; + match self.peers.get_mut(&incoming.peer_id) { + Some(state @ PeerState::Incoming) => { + debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Enabling connections.", + index, incoming.peer_id); + debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", incoming.peer_id); + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: incoming.peer_id, + handler: NotifyHandler::All, + event: NotifsHandlerIn::Enable, + }); + *state = PeerState::Enabled { open: SmallVec::new() }; + } + peer => error!(target: "sub-libp2p", + "State mismatch in libp2p: Expected alive incoming. Got {:?}.", + peer) + } } - /// Function that is called when the peerset wants us to reject an incoming node. + /// Function that is called when the peerset wants us to reject an incoming peer. fn peerset_report_reject(&mut self, index: sc_peerset::IncomingIndex) { let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) { self.incoming.remove(pos) @@ -650,30 +705,25 @@ impl GenericProto { return } - let state = if let Some(state) = self.peers.get_mut(&incoming.peer_id) { - state - } else { - error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in peers \ - corresponding to an alive incoming"); - return - }; - - let connected_point = if let PeerState::Incoming { connected_point } = state { - connected_point.clone() - } else { - error!(target: "sub-libp2p", "State mismatch in libp2p: entry in peers corresponding \ - to an alive incoming is not in incoming state"); - return - }; - - debug!(target: "sub-libp2p", "PSM => Reject({:?}, {:?}): Rejecting connection through \ - {:?}", index, incoming.peer_id, connected_point); - debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", incoming.peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { - peer_id: incoming.peer_id, - event: NotifsHandlerIn::Disable, - }); - *state = PeerState::Disabled { open: false, connected_point, banned_until: None }; + match self.peers.get_mut(&incoming.peer_id) { + Some(state @ PeerState::Incoming) => { + debug!(target: "sub-libp2p", "PSM => Reject({:?}, {:?}): Rejecting connections.", + index, incoming.peer_id); + debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", incoming.peer_id); + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: incoming.peer_id, + handler: NotifyHandler::All, + event: NotifsHandlerIn::Disable, + }); + *state = PeerState::Disabled { + open: SmallVec::new(), + banned_until: None + }; + } + peer => error!(target: "sub-libp2p", + "State mismatch in libp2p: Expected alive incoming. Got {:?}.", + peer) + } } } @@ -702,20 +752,16 @@ impl NetworkBehaviour for GenericProto { match (self.peers.entry(peer_id.clone()).or_insert(PeerState::Poisoned), connected_point) { (st @ &mut PeerState::Requested, connected_point) | (st @ &mut PeerState::PendingRequest { .. }, connected_point) => { - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}): Connection \ - requested by PSM (through {:?})", peer_id, connected_point + debug!(target: "sub-libp2p", + "Libp2p => Connected({:?}, {:?}): Connection was requested by PSM.", + peer_id, connected_point ); - debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { - peer_id: peer_id.clone(), - event: NotifsHandlerIn::Enable, - }); - *st = PeerState::Enabled { open: false, connected_point }; + *st = PeerState::Enabled { open: SmallVec::new() }; } - // Note: it may seem weird that "Banned" nodes get treated as if there were absent. + // Note: it may seem weird that "Banned" peers get treated as if they were absent. // This is because the word "Banned" means "temporarily prevent outgoing connections to - // this node", and not "banned" in the sense that we would refuse the node altogether. + // this peer", and not "banned" in the sense that we would refuse the peer altogether. (st @ &mut PeerState::Poisoned, connected_point @ ConnectedPoint::Listener { .. }) | (st @ &mut PeerState::Banned { .. }, connected_point @ ConnectedPoint::Listener { .. }) => { let incoming_id = self.next_incoming_index.clone(); @@ -726,17 +772,17 @@ impl NetworkBehaviour for GenericProto { return } }; - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}): Incoming connection", - peer_id); - debug!(target: "sub-libp2p", "PSM <= Incoming({:?}, {:?}): Through {:?}", - incoming_id, peer_id, connected_point); + debug!(target: "sub-libp2p", "Libp2p => Connected({:?}, {:?}): Incoming connection", + peer_id, connected_point); + debug!(target: "sub-libp2p", "PSM <= Incoming({:?}, {:?}).", + incoming_id, peer_id); self.peerset.incoming(peer_id.clone(), incoming_id); self.incoming.push(IncomingPeer { peer_id: peer_id.clone(), alive: true, incoming_id, }); - *st = PeerState::Incoming { connected_point }; + *st = PeerState::Incoming { }; } (st @ &mut PeerState::Poisoned, connected_point) | @@ -746,20 +792,16 @@ impl NetworkBehaviour for GenericProto { } else { None }; - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}): Requested by something \ - else than PSM, disabling", peer_id); - debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { - peer_id: peer_id.clone(), - event: NotifsHandlerIn::Disable, - }); - *st = PeerState::Disabled { open: false, connected_point, banned_until }; + debug!(target: "sub-libp2p", + "Libp2p => Connected({:?},{:?}): Not requested by PSM, disabling.", + peer_id, connected_point); + *st = PeerState::Disabled { open: SmallVec::new(), banned_until }; } st => { // This is a serious bug either in this state machine or in libp2p. error!(target: "sub-libp2p", "Received inject_connected for \ - already-connected node; state is {:?}", st + already-connected peer; state is {:?}", st ); } } @@ -771,15 +813,16 @@ impl NetworkBehaviour for GenericProto { Some(PeerState::Banned { .. }) => // This is a serious bug either in this state machine or in libp2p. error!(target: "sub-libp2p", "Received inject_disconnected for non-connected \ - node {:?}", peer_id), + peer {:?}", peer_id), - Some(PeerState::Disabled { open, banned_until, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ - (through {:?})", peer_id, endpoint); + Some(PeerState::Disabled { open, banned_until }) => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?}): Was disabled.", + peer_id, endpoint); if let Some(until) = banned_until { self.peers.insert(peer_id.clone(), PeerState::Banned { until }); } - if open { + + if !open.is_empty() { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), @@ -791,12 +834,13 @@ impl NetworkBehaviour for GenericProto { } Some(PeerState::DisabledPendingEnable { open, timer_deadline, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ - (through {:?}) but pending enable", peer_id, endpoint); + debug!(target: "sub-libp2p", + "Libp2p => Disconnected({:?}, {:?}): Was disabled but pending enable.", + peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline }); - if open { + if !open.is_empty() { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), @@ -808,8 +852,9 @@ impl NetworkBehaviour for GenericProto { } Some(PeerState::Enabled { open, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was enabled \ - (through {:?})", peer_id, endpoint); + debug!(target: "sub-libp2p", + "Libp2p => Disconnected({:?}, {:?}): Was enabled.", + peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); @@ -818,7 +863,7 @@ impl NetworkBehaviour for GenericProto { until: Instant::now() + Duration::from_secs(ban_dur) }); - if open { + if !open.is_empty() { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), @@ -831,10 +876,11 @@ impl NetworkBehaviour for GenericProto { // In the incoming state, we don't report "Dropped". Instead we will just ignore the // corresponding Accept/Reject. - Some(PeerState::Incoming { .. }) => { + Some(PeerState::Incoming { }) => { if let Some(state) = self.incoming.iter_mut().find(|i| i.peer_id == *peer_id) { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was in incoming \ - mode (id {:?}, through {:?})", peer_id, state.incoming_id, endpoint); + debug!(target: "sub-libp2p", + "Libp2p => Disconnected({:?},{:?}): Was in incoming mode with id {:?}.", + peer_id, endpoint, state.incoming_id); state.alive = false; } else { error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in incoming \ @@ -854,13 +900,13 @@ impl NetworkBehaviour for GenericProto { fn inject_dial_failure(&mut self, peer_id: &PeerId) { if let Entry::Occupied(mut entry) = self.peers.entry(peer_id.clone()) { match mem::replace(entry.get_mut(), PeerState::Poisoned) { - // The node is not in our list. + // The peer is not in our list. st @ PeerState::Banned { .. } => { trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); *entry.into_mut() = st; }, - // "Basic" situation: we failed to reach a node that the peerset requested. + // "Basic" situation: we failed to reach a peer that the peerset requested. PeerState::Requested | PeerState::PendingRequest { .. } => { debug!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); *entry.into_mut() = PeerState::Banned { @@ -870,7 +916,7 @@ impl NetworkBehaviour for GenericProto { self.peerset.dropped(peer_id.clone()) }, - // We can still get dial failures even if we are already connected to the node, + // We can still get dial failures even if we are already connected to the peer, // as an extra diagnostic for an earlier attempt. st @ PeerState::Disabled { .. } | st @ PeerState::Enabled { .. } | st @ PeerState::DisabledPendingEnable { .. } | st @ PeerState::Incoming { .. } => { @@ -883,92 +929,168 @@ impl NetworkBehaviour for GenericProto { } } else { - // The node is not in our list. + // The peer is not in our list. trace!(target: "sub-libp2p", "Libp2p => Dial failure for {:?}", peer_id); } } - fn inject_node_event( + fn inject_event( &mut self, source: PeerId, + connection: ConnectionId, event: NotifsHandlerOut, ) { match event { - NotifsHandlerOut::Closed { reason } => { - debug!(target: "sub-libp2p", "Handler({:?}) => Closed: {}", source, reason); + NotifsHandlerOut::Init => { + debug!(target: "sub-libp2p", "Handler({:?}) => Init", source); - let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { + let entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { entry } else { - error!(target: "sub-libp2p", "State mismatch in the custom protos handler"); + error!(target: "sub-libp2p", "Init: State mismatch in the custom protos handler"); return }; - debug!(target: "sub-libp2p", "External API <= Closed({:?})", source); - let event = GenericProtoOut::CustomProtocolClosed { - reason, - peer_id: source.clone(), + let event = match entry.get() { + // Waiting for a decision from the PSM. Let the handler stay + // in initialisation state. + PeerState::Incoming { .. } => None, + PeerState::Enabled { .. } => { + debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", source); + Some(NotifsHandlerIn::Enable) + } + PeerState::Disabled { .. } => { + debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); + Some(NotifsHandlerIn::Disable) + } + state => { + error!(target: "sub-libp2p", + "Unexpected peer state on request for handler initialisation: {:?}", + state); + None + } }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); - match mem::replace(entry.get_mut(), PeerState::Poisoned) { - PeerState::Enabled { open, connected_point } => { - debug_assert!(open); + if let Some(event) = event { + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event + }); + } + } + NotifsHandlerOut::Closed { endpoint, reason } => { + debug!(target: "sub-libp2p", + "Handler({:?}) => Endpoint {:?} closed for custom protocols: {}", + source, endpoint, reason); + + let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { + entry + } else { + error!(target: "sub-libp2p", "Closed: State mismatch in the custom protos handler"); + return + }; - debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source); - self.peerset.dropped(source.clone()); + let last = match mem::replace(entry.get_mut(), PeerState::Poisoned) { + PeerState::Enabled { mut open } => { + debug_assert!(open.iter().any(|c| c == &connection)); + open.retain(|c| c != &connection); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); - self.events.push(NetworkBehaviourAction::SendEvent { + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: source.clone(), + handler: NotifyHandler::One(connection), event: NotifsHandlerIn::Disable, }); + let last = open.is_empty(); + + if last { + debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source); + self.peerset.dropped(source.clone()); + *entry.into_mut() = PeerState::Disabled { + open, + banned_until: None + }; + } else { + *entry.into_mut() = PeerState::Enabled { open }; + } + + last + }, + PeerState::Disabled { mut open, banned_until } => { + debug_assert!(open.iter().any(|c| c == &connection)); + open.retain(|c| c != &connection); + let last = open.is_empty(); *entry.into_mut() = PeerState::Disabled { - open: false, - connected_point, - banned_until: None + open, + banned_until }; + last }, - PeerState::Disabled { open, connected_point, banned_until } => { - debug_assert!(open); - *entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until }; - }, - PeerState::DisabledPendingEnable { open, connected_point, timer, timer_deadline } => { - debug_assert!(open); + PeerState::DisabledPendingEnable { + mut open, + timer, + timer_deadline + } => { + debug_assert!(open.iter().any(|c| c == &connection)); + open.retain(|c| c != &connection); + let last = open.is_empty(); *entry.into_mut() = PeerState::DisabledPendingEnable { - open: false, - connected_point, + open, timer, timer_deadline }; + last }, - _ => error!(target: "sub-libp2p", "State mismatch in the custom protos handler"), + state => { + error!(target: "sub-libp2p", + "Unexpected state in the custom protos handler: {:?}", + state); + return + } + }; + + if last { + debug!(target: "sub-libp2p", "External API <= Closed({:?})", source); + let event = GenericProtoOut::CustomProtocolClosed { + reason, + peer_id: source.clone(), + }; + self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + } else { + debug!(target: "sub-libp2p", "Secondary connection closed custom protocol."); } } - NotifsHandlerOut::Open => { - debug!(target: "sub-libp2p", "Handler({:?}) => Open", source); - let endpoint = match self.peers.get_mut(&source) { - Some(PeerState::Enabled { ref mut open, ref connected_point }) | - Some(PeerState::DisabledPendingEnable { ref mut open, ref connected_point, .. }) | - Some(PeerState::Disabled { ref mut open, ref connected_point, .. }) if !*open => { - *open = true; - connected_point.clone() + NotifsHandlerOut::Open { endpoint } => { + debug!(target: "sub-libp2p", + "Handler({:?}) => Endpoint {:?} open for custom protocols.", + source, endpoint); + + let first = match self.peers.get_mut(&source) { + Some(PeerState::Enabled { ref mut open, .. }) | + Some(PeerState::DisabledPendingEnable { ref mut open, .. }) | + Some(PeerState::Disabled { ref mut open, .. }) => { + let first = open.is_empty(); + open.push(connection); + first } - _ => { - error!(target: "sub-libp2p", "State mismatch in the custom protos handler"); + state => { + error!(target: "sub-libp2p", + "Open: Unexpected state in the custom protos handler: {:?}", + state); return } }; - debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = GenericProtoOut::CustomProtocolOpen { - peer_id: source, - endpoint, - }; - - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + if first { + debug!(target: "sub-libp2p", "External API <= Open({:?})", source); + let event = GenericProtoOut::CustomProtocolOpen { peer_id: source }; + self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + } else { + debug!(target: "sub-libp2p", "Secondary connection opened custom protocol."); + } } NotifsHandlerOut::CustomMessage { message } => { @@ -1027,11 +1149,12 @@ impl NetworkBehaviour for GenericProto { } NotifsHandlerOut::ProtocolError { error, .. } => { - debug!(target: "sub-libp2p", "Handler({:?}) => Severe protocol error: {:?}", + warn!(target: "sub-libp2p", + "Handler({:?}) => Severe protocol error: {:?}", source, error); - // A severe protocol error happens when we detect a "bad" node, such as a node on - // a different chain, or a node that doesn't speak the same protocol(s). We - // decrease the node's reputation, hence lowering the chances we try this node + // A severe protocol error happens when we detect a "bad" peer, such as a per on + // a different chain, or a peer that doesn't speak the same protocol(s). We + // decrease the peer's reputation, hence lowering the chances we try this peer // again in the short term. self.peerset.report_peer( source.clone(), @@ -1089,23 +1212,27 @@ impl NetworkBehaviour for GenericProto { *peer_state = PeerState::Requested; } - PeerState::DisabledPendingEnable { mut timer, connected_point, open, timer_deadline } => { + PeerState::DisabledPendingEnable { + mut timer, + open, + timer_deadline + } => { if let Poll::Pending = Pin::new(&mut timer).poll(cx) { *peer_state = PeerState::DisabledPendingEnable { timer, - connected_point, open, timer_deadline }; continue; } - debug!(target: "sub-libp2p", "Handler({:?}) <= Enable now that ban has expired", peer_id); - self.events.push(NetworkBehaviourAction::SendEvent { + debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id); + self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), + handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); - *peer_state = PeerState::Enabled { connected_point, open }; + *peer_state = PeerState::Enabled { open }; } st @ _ => *peer_state = st, diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index d6d9919d3e14d..90cd7f3d87107 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -145,7 +145,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { } /// Event that can be received by a `NotifsHandler`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum NotifsHandlerIn { /// The node should start using custom protocols. Enable, @@ -186,13 +186,25 @@ pub enum NotifsHandlerIn { /// Event that can be emitted by a `NotifsHandler`. #[derive(Debug)] pub enum NotifsHandlerOut { - /// Opened the substreams with the remote. - Open, + /// The connection handler is requesting initialisation, i.e. + /// to be either enabled or disabled. + /// + /// This is always the first event emitted by a handler and it is only + /// emitted once. + Init, + + /// The connection is open for custom protocols. + Open { + /// The endpoint of the connection that is open for custom protocols. + endpoint: ConnectedPoint, + }, - /// Closed the substreams with the remote. + /// The connection is closed for custom protocols. Closed { - /// Reason why the substream closed, for diagnostic purposes. + /// The reason for closing, for diagnostic purposes. reason: Cow<'static, str>, + /// The endpoint of the connection that closed for custom protocols. + endpoint: ConnectedPoint, }, /// Received a non-gossiping message on the legacy substream. @@ -493,13 +505,17 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { .. }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Init + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open + NotifsHandlerOut::Open { endpoint } )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Closed { reason } + NotifsHandlerOut::Closed { endpoint, reason } )), ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index a2d2fc9246d1c..d3993570e6da2 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -40,9 +40,8 @@ use std::{pin::Pin, task::{Context, Poll}}; /// it is turned into a `LegacyProtoHandler`. It then handles all communications that are specific /// to Substrate on that single connection. /// -/// Note that there can be multiple instance of this struct simultaneously for same peer. However -/// if that happens, only one main instance can communicate with the outer layers of the code. In -/// other words, the outer layers of the code only ever see one handler. +/// Note that there can be multiple instance of this struct simultaneously for same peer, +/// if there are multiple established connections to the peer. /// /// ## State of the handler /// @@ -61,6 +60,7 @@ use std::{pin::Pin, task::{Context, Poll}}; /// these states. For example, if the handler reports a network misbehaviour, it will close the /// substreams but it is the role of the user to send a `Disabled` event if it wants the connection /// to close. Otherwise, the handler will try to reopen substreams. +/// /// The handler starts in the "Initializing" state and must be transitionned to Enabled or Disabled /// as soon as possible. /// @@ -109,16 +109,18 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { } fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { - LegacyProtoHandler { + let mut handler = LegacyProtoHandler { protocol: self.protocol, - endpoint: connected_point.to_endpoint(), + endpoint: connected_point.clone(), remote_peer_id: remote_peer_id.clone(), state: ProtocolState::Init { substreams: SmallVec::new(), init_deadline: Delay::new(Duration::from_secs(5)) }, events_queue: SmallVec::new(), - } + }; + handler.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init {})); + handler } } @@ -136,7 +138,7 @@ pub struct LegacyProtoHandler { /// Whether we are the connection dialer or listener. Used to determine who, between the local /// node and the remote node, has priority. - endpoint: Endpoint, + endpoint: ConnectedPoint, /// Queue of events to send to the outside. /// @@ -214,16 +216,26 @@ pub enum LegacyProtoHandlerIn { /// Event that can be emitted by a `LegacyProtoHandler`. #[derive(Debug)] pub enum LegacyProtoHandlerOut { + /// The handler is requesting initialisation. + /// + /// This is always the first event emitted by a handler, and it is only + /// emitted once. + Init, + /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. version: u8, + /// The connected endpoint. + endpoint: ConnectedPoint, }, /// Closed a custom protocol with the remote. CustomProtocolClosed { /// Reason why the substream closed, for diagnostic purposes. reason: Cow<'static, str>, + /// The connected endpoint. + endpoint: ConnectedPoint, }, /// Receives a message on a custom protocol substream. @@ -272,7 +284,7 @@ impl LegacyProtoHandler { ProtocolState::Init { substreams: incoming, .. } => { if incoming.is_empty() { - if let Endpoint::Dialer = self.endpoint { + if let ConnectedPoint::Dialer { .. } = self.endpoint { self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.protocol.clone()), info: (), @@ -281,10 +293,10 @@ impl LegacyProtoHandler { ProtocolState::Opening { deadline: Delay::new(Duration::from_secs(60)) } - } else { let event = LegacyProtoHandlerOut::CustomProtocolOpen { - version: incoming[0].protocol_version() + version: incoming[0].protocol_version(), + endpoint: self.endpoint.clone() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { @@ -404,6 +416,7 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: "All substreams have been closed by the remote".into(), + endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -416,6 +429,7 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: format!("Error on the last substream: {:?}", err).into(), + endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -479,7 +493,8 @@ impl LegacyProtoHandler { ProtocolState::Opening { .. } => { let event = LegacyProtoHandlerOut::CustomProtocolOpen { - version: substream.protocol_version() + version: substream.protocol_version(), + endpoint: self.endpoint.clone() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index 4e16fb1af419f..a4117338b92a0 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -72,7 +72,7 @@ pub struct NotifsInHandler { } /// Event that can be received by a `NotifsInHandler`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum NotifsInHandlerIn { /// Can be sent back as a response to an `OpenRequest`. Contains the status message to send /// to the remote. diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index 8c64491d99717..f29ab72c3a960 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -33,7 +33,7 @@ use libp2p::swarm::{ SubstreamProtocol, NegotiatedSubstream, }; -use log::error; +use log::{debug, warn, error}; use smallvec::SmallVec; use std::{borrow::Cow, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; use wasm_timer::Instant; @@ -268,7 +268,7 @@ impl ProtocolsHandler for NotifsOutHandler { // be recovered. When in doubt, let's drop the existing substream and // open a new one. if sub.close().now_or_never().is_none() { - log::warn!( + warn!( target: "sub-libp2p", "Improperly closed outbound notifications substream" ); @@ -281,16 +281,22 @@ impl ProtocolsHandler for NotifsOutHandler { }); self.state = State::Opening { initial_message }; }, - State::Opening { .. } | State::Refused | State::Open { .. } => - error!("Tried to enable notifications handler that was already enabled"), + st @ State::Opening { .. } | st @ State::Refused | st @ State::Open { .. } => { + debug!(target: "sub-libp2p", + "Tried to enable notifications handler that was already enabled"); + self.state = st; + } State::Poisoned => error!("Notifications handler in a poisoned state"), } } NotifsOutHandlerIn::Disable => { match mem::replace(&mut self.state, State::Poisoned) { - State::Disabled | State::DisabledOpen(_) | State::DisabledOpening => - error!("Tried to disable notifications handler that was already disabled"), + st @ State::Disabled | st @ State::DisabledOpen(_) | st @ State::DisabledOpening => { + debug!(target: "sub-libp2p", + "Tried to disable notifications handler that was already disabled"); + self.state = st; + } State::Opening { .. } => self.state = State::DisabledOpening, State::Refused => self.state = State::Disabled, State::Open { substream, .. } => self.state = State::DisabledOpen(substream), @@ -302,14 +308,14 @@ impl ProtocolsHandler for NotifsOutHandler { if let State::Open { substream, .. } = &mut self.state { if let Some(Ok(_)) = substream.send(msg).now_or_never() { } else { - log::warn!( + warn!( target: "sub-libp2p", "Failed to push message to queue, dropped it" ); } } else { // This is an API misuse. - log::warn!( + warn!( target: "sub-libp2p", "Tried to send a notification on a disabled handler" ); diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index b8436e2c7f704..e14608917713e 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -18,7 +18,7 @@ use futures::{prelude::*, ready}; use codec::{Encode, Decode}; -use libp2p::core::nodes::listeners::ListenerId; +use libp2p::core::connection::{ConnectionId, ListenerId}; use libp2p::core::ConnectedPoint; use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler}; use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction}; @@ -156,12 +156,13 @@ impl NetworkBehaviour for CustomProtoWithAddr { self.inner.inject_disconnected(peer_id, endpoint) } - fn inject_node_event( + fn inject_event( &mut self, peer_id: PeerId, + connection: ConnectionId, event: <::Handler as ProtocolsHandler>::OutEvent ) { - self.inner.inject_node_event(peer_id, event) + self.inner.inject_event(peer_id, connection, event) } fn poll( @@ -177,10 +178,6 @@ impl NetworkBehaviour for CustomProtoWithAddr { self.inner.poll(cx, params) } - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.inner.inject_replaced(peer_id, closed_endpoint, new_endpoint) - } - fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { self.inner.inject_addr_reach_failure(peer_id, addr, error) } diff --git a/client/network/src/protocol/light_client_handler.rs b/client/network/src/protocol/light_client_handler.rs index b531f3515a6ec..7457818f0737b 100644 --- a/client/network/src/protocol/light_client_handler.rs +++ b/client/network/src/protocol/light_client_handler.rs @@ -37,6 +37,7 @@ use libp2p::{ ConnectedPoint, Multiaddr, PeerId, + connection::ConnectionId, upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, upgrade::{OutboundUpgrade, read_one, write_one} }, @@ -44,6 +45,7 @@ use libp2p::{ NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, OneShotHandler, PollParameters, SubstreamProtocol @@ -678,7 +680,7 @@ where self.remove_peer(peer) } - fn inject_node_event(&mut self, peer: PeerId, event: Event) { + fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: Event) { match event { // An incoming request from remote has been received. Event::Request(request, mut stream) => { @@ -842,7 +844,11 @@ where peer: peer.clone(), }; self.outstanding.insert(id, rw); - return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id: peer, event: protocol }) + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id: peer, + handler: NotifyHandler::Any, + event: protocol + }) } } else { self.pending_requests.push_front(request); @@ -1106,6 +1112,7 @@ mod tests { Multiaddr, core::{ ConnectedPoint, + connection::ConnectionId, identity, muxing::{StreamMuxerBox, SubstreamRef}, transport::{Transport, boxed::Boxed, memory::MemoryTransport}, @@ -1262,7 +1269,7 @@ mod tests { assert_eq!(1, behaviour.pending_requests.len()); // The behaviour should now attempt to send the request. - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, .. }) => { + assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, .. }) => { assert!(peer_id == peer0 || peer_id == peer1) }); @@ -1330,7 +1337,8 @@ mod tests { } }; - behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response)); + let conn = ConnectionId::new(0); + behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, response)); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); // More progress @@ -1359,7 +1367,8 @@ mod tests { } }; - behaviour.inject_node_event(peer.clone(), Event::Response(2347895932, response)); + let conn = ConnectionId::new(0); + behaviour.inject_event(peer.clone(), conn, Event::Response(2347895932, response)); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); @@ -1401,7 +1410,8 @@ mod tests { } }; - behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response)); + let conn = ConnectionId::new(0); + behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, response)); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); // More progress @@ -1439,11 +1449,11 @@ mod tests { assert_eq!(1, behaviour.pending_requests.len()); assert_eq!(0, behaviour.outstanding.len()); - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { .. })); + assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. })); assert_eq!(0, behaviour.pending_requests.len()); assert_eq!(1, behaviour.outstanding.len()); - for _ in 0 .. 3 { + for i in 0 .. 3 { // Construct an invalid response let request_id = *behaviour.outstanding.keys().next().unwrap(); let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone(); @@ -1453,8 +1463,9 @@ mod tests { response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) } }; - behaviour.inject_node_event(responding_peer, Event::Response(request_id, response.clone())); - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { .. })); + let conn = ConnectionId::new(i); + behaviour.inject_event(responding_peer, conn, Event::Response(request_id, response.clone())); + assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. })); assert_matches!(chan.1.try_recv(), Ok(None)) } // Final invalid response @@ -1466,7 +1477,8 @@ mod tests { response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } }; - behaviour.inject_node_event(responding_peer, Event::Response(request_id, response)); + let conn = ConnectionId::new(3); + behaviour.inject_event(responding_peer, conn, Event::Response(request_id, response)); assert_matches!(poll(&mut behaviour), Poll::Pending); assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed)))) } @@ -1524,12 +1536,13 @@ mod tests { assert_eq!(1, behaviour.pending_requests.len()); assert_eq!(0, behaviour.outstanding.len()); - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { .. })); + assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. })); assert_eq!(0, behaviour.pending_requests.len()); assert_eq!(1, behaviour.outstanding.len()); assert_eq!(1, *behaviour.outstanding.keys().next().unwrap()); - behaviour.inject_node_event(peer.clone(), Event::Response(1, response)); + let conn = ConnectionId::new(0); + behaviour.inject_event(peer.clone(), conn, Event::Response(1, response)); poll(&mut behaviour);