From 867503d3f636c1b163cdef676278655414c4234e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 9 Aug 2022 21:28:32 +0300 Subject: [PATCH] Network sync refactoring (part 6) (#11940) * Extract `NetworkKVProvider` trait in `sc-authority-discovery` and remove unnecessary dependency * Extract `NetworkSyncForkRequest` trait in `sc-finality-grandpa` * Relax requirements on `SyncOracle` trait, remove extra native methods from `NetworkService` that are already provided by trait impls * Move `NetworkSigner` trait from `sc-authority-discovery` into `sc-network-common` and de-duplicate methods on `NetworkService` * Move `NetworkKVProvider` trait from `sc-authority-discovery` into `sc-network-common` and de-duplicate methods on `NetworkService` * Minimize `sc-authority-discovery` dependency on `sc-network` * Move `NetworkSyncForkRequest` trait from `sc-finality-grandpa` to `sc-network-common` and de-duplicate methods in `NetworkService` * Extract `NetworkStatusProvider` trait and de-duplicate methods on `NetworkService` * Extract `NetworkPeers` trait and de-duplicate methods on `NetworkService` * Extract `NetworkEventStream` trait and de-duplicate methods on `NetworkService` * Move more methods from `NetworkService` into `NetworkPeers` trait * Move `NetworkStateInfo` trait into `sc-network-common` * Extract `NetworkNotification` trait and de-duplicate methods on `NetworkService` * Extract `NetworkRequest` trait and de-duplicate methods on `NetworkService` * Remove `NetworkService::local_peer_id()`, it is already provided by `NetworkStateInfo` impl * Extract `NetworkTransaction` trait and de-duplicate methods on `NetworkService` * Extract `NetworkBlock` trait and de-duplicate methods on `NetworkService` * Remove dependencies on `NetworkService` from most of the methods of `sc-service` * Address simple review comments --- Cargo.lock | 11 +- bin/node/cli/Cargo.toml | 1 + bin/node/cli/src/service.rs | 3 +- client/authority-discovery/Cargo.toml | 2 +- client/authority-discovery/src/lib.rs | 3 +- client/authority-discovery/src/service.rs | 2 +- client/authority-discovery/src/tests.rs | 4 +- client/authority-discovery/src/worker.rs | 76 +- .../src/worker/addr_cache.rs | 7 +- .../src/worker/schema/tests.rs | 5 +- .../authority-discovery/src/worker/tests.rs | 63 +- client/consensus/pow/src/lib.rs | 2 +- client/consensus/slots/src/lib.rs | 2 +- .../src/communication/gossip.rs | 3 +- .../finality-grandpa/src/communication/mod.rs | 45 +- .../src/communication/tests.rs | 143 +++- client/informant/Cargo.toml | 2 +- client/informant/src/display.rs | 8 +- client/informant/src/lib.rs | 7 +- client/network-gossip/Cargo.toml | 1 + client/network-gossip/src/bridge.rs | 138 ++- client/network-gossip/src/lib.rs | 79 +- client/network-gossip/src/state_machine.rs | 126 ++- client/network-gossip/src/validator.rs | 3 +- client/network/common/Cargo.toml | 3 + client/network/common/src/lib.rs | 2 + client/network/common/src/protocol.rs | 19 + .../{ => common}/src/protocol/event.rs | 4 +- .../network/common/src/request_responses.rs | 39 +- client/network/common/src/service.rs | 660 +++++++++++++++ .../{ => common}/src/service/signature.rs | 5 +- client/network/src/behaviour.rs | 12 +- client/network/src/lib.rs | 58 +- client/network/src/protocol.rs | 3 +- client/network/src/request_responses.rs | 41 +- client/network/src/service.rs | 794 +++++++----------- client/network/src/service/out_events.rs | 3 +- client/network/src/service/tests.rs | 59 +- client/network/src/transactions.rs | 10 +- client/network/test/src/lib.rs | 16 +- client/offchain/Cargo.toml | 1 + client/offchain/src/api.rs | 79 +- client/offchain/src/lib.rs | 104 ++- client/service/src/builder.rs | 58 +- client/service/src/lib.rs | 2 + client/service/src/metrics.rs | 8 +- client/service/test/Cargo.toml | 1 + client/service/test/src/lib.rs | 9 +- primitives/consensus/common/src/lib.rs | 18 +- 49 files changed, 1802 insertions(+), 942 deletions(-) create mode 100644 client/network/common/src/protocol.rs rename client/network/{ => common}/src/protocol/event.rs (96%) create mode 100644 client/network/common/src/service.rs rename client/network/{ => common}/src/service/signature.rs (96%) diff --git a/Cargo.lock b/Cargo.lock index 703eeffce54a1..38063e8060835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4686,6 +4686,7 @@ dependencies = [ "sc-finality-grandpa", "sc-keystore", "sc-network", + "sc-network-common", "sc-rpc", "sc-service", "sc-service-test", @@ -7702,7 +7703,6 @@ dependencies = [ name = "sc-authority-discovery" version = "0.10.0-dev" dependencies = [ - "async-trait", "futures", "futures-timer", "ip_network", @@ -7715,6 +7715,7 @@ dependencies = [ "rand 0.7.3", "sc-client-api", "sc-network", + "sc-network-common", "sp-api", "sp-authority-discovery", "sp-blockchain", @@ -8316,7 +8317,7 @@ dependencies = [ "log", "parity-util-mem", "sc-client-api", - "sc-network", + "sc-network-common", "sc-transaction-pool-api", "sp-blockchain", "sp-runtime", @@ -8398,7 +8399,9 @@ dependencies = [ name = "sc-network-common" version = "0.10.0-dev" dependencies = [ + "async-trait", "bitflags", + "bytes", "futures", "libp2p", "parity-scale-codec", @@ -8409,6 +8412,7 @@ dependencies = [ "sp-consensus", "sp-finality-grandpa", "sp-runtime", + "thiserror", ] [[package]] @@ -8424,6 +8428,7 @@ dependencies = [ "lru", "quickcheck", "sc-network", + "sc-network-common", "sp-runtime", "substrate-prometheus-endpoint", "substrate-test-runtime-client", @@ -8533,6 +8538,7 @@ dependencies = [ "sc-client-api", "sc-client-db", "sc-network", + "sc-network-common", "sc-transaction-pool", "sc-transaction-pool-api", "sc-utils", @@ -8741,6 +8747,7 @@ dependencies = [ "sc-consensus", "sc-executor", "sc-network", + "sc-network-common", "sc-service", "sc-transaction-pool-api", "sp-api", diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index fe7c5332e2b45..89ca3ebb45576 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -66,6 +66,7 @@ sc-consensus = { version = "0.10.0-dev", path = "../../../client/consensus/commo sc-transaction-pool = { version = "4.0.0-dev", path = "../../../client/transaction-pool" } sc-transaction-pool-api = { version = "4.0.0-dev", path = "../../../client/transaction-pool/api" } sc-network = { version = "0.10.0-dev", path = "../../../client/network" } +sc-network-common = { version = "0.10.0-dev", path = "../../../client/network/common" } sc-consensus-slots = { version = "0.10.0-dev", path = "../../../client/consensus/slots" } sc-consensus-babe = { version = "0.10.0-dev", path = "../../../client/consensus/babe" } sc-consensus-uncles = { version = "0.10.0-dev", path = "../../../client/consensus/uncles" } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index e0644f462cf20..b20a3ac59a96a 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -29,7 +29,8 @@ use node_primitives::Block; use sc_client_api::{BlockBackend, ExecutorProvider}; use sc_consensus_babe::{self, SlotProportion}; use sc_executor::NativeElseWasmExecutor; -use sc_network::{Event, NetworkService}; +use sc_network::NetworkService; +use sc_network_common::{protocol::event::Event, service::NetworkEventStream}; use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sp_api::ProvideRuntimeApi; diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index 012e6096aab80..544df4c8d4812 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -17,7 +17,6 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.10" [dependencies] -async-trait = "0.1" codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false } futures = "0.3.21" futures-timer = "3.0.1" @@ -29,6 +28,7 @@ rand = "0.7.2" thiserror = "1.0" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network = { version = "0.10.0-dev", path = "../network" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-authority-discovery = { version = "4.0.0-dev", path = "../../primitives/authority-discovery" } diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index 8522da9984a6f..f0ef374551617 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -39,8 +39,9 @@ use futures::{ Stream, }; +use libp2p::{Multiaddr, PeerId}; use sc_client_api::blockchain::HeaderBackend; -use sc_network::{DhtEvent, Multiaddr, PeerId}; +use sc_network_common::protocol::event::DhtEvent; use sp_api::ProvideRuntimeApi; use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId}; use sp_runtime::traits::Block as BlockT; diff --git a/client/authority-discovery/src/service.rs b/client/authority-discovery/src/service.rs index c240e5d0c2287..df09b6ea43216 100644 --- a/client/authority-discovery/src/service.rs +++ b/client/authority-discovery/src/service.rs @@ -25,7 +25,7 @@ use futures::{ SinkExt, }; -use sc_network::{Multiaddr, PeerId}; +use libp2p::{Multiaddr, PeerId}; use sp_authority_discovery::AuthorityId; /// Service to interact with the [`crate::Worker`]. diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs index e9f94b6a186db..334b2638ca58c 100644 --- a/client/authority-discovery/src/tests.rs +++ b/client/authority-discovery/src/tests.rs @@ -87,12 +87,12 @@ fn get_addresses_and_authority_id() { fn cryptos_are_compatible() { use sp_core::crypto::Pair; - let libp2p_secret = sc_network::Keypair::generate_ed25519(); + let libp2p_secret = libp2p::identity::Keypair::generate_ed25519(); let libp2p_public = libp2p_secret.public(); let sp_core_secret = { let libp2p_ed_secret = match libp2p_secret.clone() { - sc_network::Keypair::Ed25519(x) => x, + libp2p::identity::Keypair::Ed25519(x) => x, _ => panic!("generate_ed25519 should have generated an Ed25519 key ¯\\_(ツ)_/¯"), }; sp_core::ed25519::Pair::from_seed_slice(&libp2p_ed_secret.secret().as_ref()).unwrap() diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index 87cc72ba7a69c..f13a1cf33581b 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -32,19 +32,22 @@ use std::{ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}; use addr_cache::AddrCache; -use async_trait::async_trait; use codec::Decode; use ip_network::IpNetwork; use libp2p::{ core::multiaddr, multihash::{Multihash, MultihashDigest}, + Multiaddr, PeerId, }; use log::{debug, error, log_enabled}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use sc_client_api::blockchain::HeaderBackend; -use sc_network::{DhtEvent, ExHashT, Multiaddr, NetworkStateInfo, PeerId}; +use sc_network_common::{ + protocol::event::DhtEvent, + service::{KademliaKey, NetworkDHTProvider, NetworkSigner, NetworkStateInfo, Signature}, +}; use sp_api::ProvideRuntimeApi; use sp_authority_discovery::{ AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature, @@ -136,7 +139,7 @@ pub struct Worker { /// Queue of throttled lookups pending to be passed to the network. pending_lookups: Vec, /// Set of in-flight lookups. - in_flight_lookups: HashMap, + in_flight_lookups: HashMap, addr_cache: addr_cache::AddrCache, @@ -464,10 +467,7 @@ where } } - fn handle_dht_value_found_event( - &mut self, - values: Vec<(sc_network::KademliaKey, Vec)>, - ) -> Result<()> { + fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec)>) -> Result<()> { // Ensure `values` is not empty and all its keys equal. let remote_key = single(values.iter().map(|(key, _)| key.clone())) .map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)? @@ -523,11 +523,11 @@ where // properly signed by the owner of the PeerId if let Some(peer_signature) = peer_signature { - let public_key = - sc_network::PublicKey::from_protobuf_encoding(&peer_signature.public_key) - .map_err(Error::ParsingLibp2pIdentity)?; - let signature = - sc_network::Signature { public_key, bytes: peer_signature.signature }; + let public_key = libp2p::identity::PublicKey::from_protobuf_encoding( + &peer_signature.public_key, + ) + .map_err(Error::ParsingLibp2pIdentity)?; + let signature = Signature { public_key, bytes: peer_signature.signature }; if !signature.verify(record, &remote_peer_id) { return Err(Error::VerifyingDhtPayload) @@ -590,55 +590,15 @@ where } } -pub trait NetworkSigner { - /// Sign a message in the name of `self.local_peer_id()` - fn sign_with_local_identity( - &self, - msg: impl AsRef<[u8]>, - ) -> std::result::Result; -} - /// NetworkProvider provides [`Worker`] with all necessary hooks into the /// underlying Substrate networking. Using this trait abstraction instead of -/// [`sc_network::NetworkService`] directly is necessary to unit test [`Worker`]. -#[async_trait] -pub trait NetworkProvider: NetworkStateInfo + NetworkSigner { - /// Start putting a value in the Dht. - fn put_value(&self, key: sc_network::KademliaKey, value: Vec); - - /// Start getting a value from the Dht. - fn get_value(&self, key: &sc_network::KademliaKey); -} +/// `sc_network::NetworkService` directly is necessary to unit test [`Worker`]. +pub trait NetworkProvider: NetworkDHTProvider + NetworkStateInfo + NetworkSigner {} -impl NetworkSigner for sc_network::NetworkService -where - B: BlockT + 'static, - H: ExHashT, -{ - fn sign_with_local_identity( - &self, - msg: impl AsRef<[u8]>, - ) -> std::result::Result { - self.sign_with_local_identity(msg) - } -} - -#[async_trait::async_trait] -impl NetworkProvider for sc_network::NetworkService -where - B: BlockT + 'static, - H: ExHashT, -{ - fn put_value(&self, key: sc_network::KademliaKey, value: Vec) { - self.put_value(key, value) - } - fn get_value(&self, key: &sc_network::KademliaKey) { - self.get_value(key) - } -} +impl NetworkProvider for T where T: NetworkDHTProvider + NetworkStateInfo + NetworkSigner {} -fn hash_authority_id(id: &[u8]) -> sc_network::KademliaKey { - sc_network::KademliaKey::new(&libp2p::multihash::Code::Sha2_256.digest(id).digest()) +fn hash_authority_id(id: &[u8]) -> KademliaKey { + KademliaKey::new(&libp2p::multihash::Code::Sha2_256.digest(id).digest()) } // Makes sure all values are the same and returns it @@ -685,7 +645,7 @@ async fn sign_record_with_authority_ids( peer_signature: Option, key_store: &dyn CryptoStore, keys: Vec, -) -> Result)>> { +) -> Result)>> { let signatures = key_store .sign_with_all(key_types::AUTHORITY_DISCOVERY, keys.clone(), &serialized_record) .await diff --git a/client/authority-discovery/src/worker/addr_cache.rs b/client/authority-discovery/src/worker/addr_cache.rs index f768b9c4e66a7..19bbbf0b62e7e 100644 --- a/client/authority-discovery/src/worker/addr_cache.rs +++ b/client/authority-discovery/src/worker/addr_cache.rs @@ -16,9 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use libp2p::core::multiaddr::{Multiaddr, Protocol}; - -use sc_network::PeerId; +use libp2p::{ + core::multiaddr::{Multiaddr, Protocol}, + PeerId, +}; use sp_authority_discovery::AuthorityId; use std::collections::{hash_map::Entry, HashMap, HashSet}; diff --git a/client/authority-discovery/src/worker/schema/tests.rs b/client/authority-discovery/src/worker/schema/tests.rs index b85a4ce37447d..60147d6762e50 100644 --- a/client/authority-discovery/src/worker/schema/tests.rs +++ b/client/authority-discovery/src/worker/schema/tests.rs @@ -21,9 +21,8 @@ mod schema_v1 { } use super::*; -use libp2p::multiaddr::Multiaddr; +use libp2p::{multiaddr::Multiaddr, PeerId}; use prost::Message; -use sc_network::PeerId; #[test] fn v2_decodes_v1() { @@ -56,7 +55,7 @@ fn v2_decodes_v1() { #[test] fn v1_decodes_v2() { - let peer_secret = sc_network::Keypair::generate_ed25519(); + let peer_secret = libp2p::identity::Keypair::generate_ed25519(); let peer_public = peer_secret.public(); let peer_id = peer_public.to_peer_id(); let multiaddress: Multiaddr = diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index a1a699bc30dd2..5a60d3353db52 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -22,7 +22,6 @@ use std::{ task::Poll, }; -use async_trait::async_trait; use futures::{ channel::mpsc::{self, channel}, executor::{block_on, LocalPool}, @@ -30,9 +29,10 @@ use futures::{ sink::SinkExt, task::LocalSpawn, }; -use libp2p::{core::multiaddr, PeerId}; +use libp2p::{core::multiaddr, identity::Keypair, PeerId}; use prometheus_endpoint::prometheus::default_registry; +use sc_network_common::service::{KademliaKey, Signature, SigningError}; use sp_api::{ApiRef, ProvideRuntimeApi}; use sp_keystore::{testing::KeyStore, CryptoStore}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; @@ -111,18 +111,18 @@ sp_api::mock_impl_runtime_apis! { #[derive(Debug)] pub enum TestNetworkEvent { - GetCalled(sc_network::KademliaKey), - PutCalled(sc_network::KademliaKey, Vec), + GetCalled(KademliaKey), + PutCalled(KademliaKey, Vec), } pub struct TestNetwork { peer_id: PeerId, - identity: sc_network::Keypair, + identity: Keypair, external_addresses: Vec, // Whenever functions on `TestNetwork` are called, the function arguments are added to the // vectors below. - pub put_value_call: Arc)>>>, - pub get_value_call: Arc>>, + pub put_value_call: Arc)>>>, + pub get_value_call: Arc>>, event_sender: mpsc::UnboundedSender, event_receiver: Option>, } @@ -136,7 +136,7 @@ impl TestNetwork { impl Default for TestNetwork { fn default() -> Self { let (tx, rx) = mpsc::unbounded(); - let identity = sc_network::Keypair::generate_ed25519(); + let identity = Keypair::generate_ed25519(); TestNetwork { peer_id: identity.public().to_peer_id(), identity, @@ -153,21 +153,20 @@ impl NetworkSigner for TestNetwork { fn sign_with_local_identity( &self, msg: impl AsRef<[u8]>, - ) -> std::result::Result { - sc_network::Signature::sign_message(msg, &self.identity) + ) -> std::result::Result { + Signature::sign_message(msg, &self.identity) } } -#[async_trait] -impl NetworkProvider for TestNetwork { - fn put_value(&self, key: sc_network::KademliaKey, value: Vec) { +impl NetworkDHTProvider for TestNetwork { + fn put_value(&self, key: KademliaKey, value: Vec) { self.put_value_call.lock().unwrap().push((key.clone(), value.clone())); self.event_sender .clone() .unbounded_send(TestNetworkEvent::PutCalled(key, value)) .unwrap(); } - fn get_value(&self, key: &sc_network::KademliaKey) { + fn get_value(&self, key: &KademliaKey) { self.get_value_call.lock().unwrap().push(key.clone()); self.event_sender .clone() @@ -186,12 +185,16 @@ impl NetworkStateInfo for TestNetwork { } } -impl NetworkSigner for sc_network::Keypair { +struct TestSigner<'a> { + keypair: &'a Keypair, +} + +impl<'a> NetworkSigner for TestSigner<'a> { fn sign_with_local_identity( &self, msg: impl AsRef<[u8]>, - ) -> std::result::Result { - sc_network::Signature::sign_message(msg, self) + ) -> std::result::Result { + Signature::sign_message(msg, self.keypair) } } @@ -200,7 +203,7 @@ async fn build_dht_event( public_key: AuthorityId, key_store: &dyn CryptoStore, network: Option<&Signer>, -) -> Vec<(sc_network::KademliaKey, Vec)> { +) -> Vec<(KademliaKey, Vec)> { let serialized_record = serialize_authority_record(serialize_addresses(addresses.into_iter())).unwrap(); @@ -313,7 +316,7 @@ fn publish_discover_cycle() { let dht_event = { let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap(); - sc_network::DhtEvent::ValueFound(vec![(key, value)]) + DhtEvent::ValueFound(vec![(key, value)]) }; // Node B discovering node A's address. @@ -469,7 +472,7 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { None, ) .await; - sc_network::DhtEvent::ValueFound(kv_pairs) + DhtEvent::ValueFound(kv_pairs) }; dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); @@ -487,7 +490,7 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { struct DhtValueFoundTester { pub remote_key_store: KeyStore, pub remote_authority_public: sp_core::sr25519::Public, - pub remote_node_key: sc_network::Keypair, + pub remote_node_key: Keypair, pub local_worker: Option< Worker< TestApi, @@ -496,7 +499,7 @@ struct DhtValueFoundTester { sp_runtime::generic::Header, substrate_test_runtime_client::runtime::Extrinsic, >, - std::pin::Pin>>, + std::pin::Pin>>, >, >, } @@ -508,7 +511,7 @@ impl DhtValueFoundTester { block_on(remote_key_store.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)) .unwrap(); - let remote_node_key = sc_network::Keypair::generate_ed25519(); + let remote_node_key = Keypair::generate_ed25519(); Self { remote_key_store, remote_authority_public, remote_node_key, local_worker: None } } @@ -523,7 +526,7 @@ impl DhtValueFoundTester { fn process_value_found( &mut self, strict_record_validation: bool, - values: Vec<(sc_network::KademliaKey, Vec)>, + values: Vec<(KademliaKey, Vec)>, ) -> Option<&HashSet> { let (_dht_event_tx, dht_event_rx) = channel(1); let local_test_api = @@ -583,7 +586,7 @@ fn strict_accept_address_with_peer_signature() { vec![addr.clone()], tester.remote_authority_public.clone().into(), &tester.remote_key_store, - Some(&tester.remote_node_key), + Some(&TestSigner { keypair: &tester.remote_node_key }), )); let cached_remote_addresses = tester.process_value_found(true, kv_pairs); @@ -598,12 +601,12 @@ fn strict_accept_address_with_peer_signature() { #[test] fn reject_address_with_rogue_peer_signature() { let mut tester = DhtValueFoundTester::new(); - let rogue_remote_node_key = sc_network::Keypair::generate_ed25519(); + let rogue_remote_node_key = Keypair::generate_ed25519(); let kv_pairs = block_on(build_dht_event( vec![tester.multiaddr_with_peer_id(1)], tester.remote_authority_public.clone().into(), &tester.remote_key_store, - Some(&rogue_remote_node_key), + Some(&TestSigner { keypair: &rogue_remote_node_key }), )); let cached_remote_addresses = tester.process_value_found(false, kv_pairs); @@ -621,7 +624,7 @@ fn reject_address_with_invalid_peer_signature() { vec![tester.multiaddr_with_peer_id(1)], tester.remote_authority_public.clone().into(), &tester.remote_key_store, - Some(&tester.remote_node_key), + Some(&TestSigner { keypair: &tester.remote_node_key }), )); // tamper with the signature let mut record = schema::SignedAuthorityRecord::decode(kv_pairs[0].1.as_slice()).unwrap(); @@ -808,7 +811,7 @@ fn lookup_throttling() { None, ) .await; - sc_network::DhtEvent::ValueFound(kv_pairs) + DhtEvent::ValueFound(kv_pairs) }; dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); @@ -822,7 +825,7 @@ fn lookup_throttling() { // Make second one fail. let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap(); - let dht_event = sc_network::DhtEvent::ValueNotFound(remote_hash); + let dht_event = DhtEvent::ValueNotFound(remote_hash); dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1."); // Assert worker to trigger another lookup. diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 6f9ee6f864ad8..f63e453a48026 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -518,7 +518,7 @@ pub fn start_mining_worker( select_chain: S, algorithm: Algorithm, mut env: E, - mut sync_oracle: SO, + sync_oracle: SO, justification_sync_link: L, pre_runtime: Option>, create_inherent_data_providers: CIDP, diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index 7c3de13444c1a..39b40a32f18ca 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -490,7 +490,7 @@ pub async fn start_slot_worker( slot_duration: SlotDuration, client: C, mut worker: W, - mut sync_oracle: SO, + sync_oracle: SO, create_inherent_data_providers: CIDP, can_author_with: CAW, ) where diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 65d7dfb783aa3..9e9f2fb98b0d1 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -89,7 +89,8 @@ use log::{debug, trace}; use parity_scale_codec::{Decode, Encode}; use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64}; use rand::seq::SliceRandom; -use sc_network::{ObservedRole, PeerId, ReputationChange}; +use sc_network::{PeerId, ReputationChange}; +use sc_network_common::protocol::event::ObservedRole; use sc_network_gossip::{MessageIntent, ValidatorContext}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 378501cffdd62..7a47ebe214950 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -45,7 +45,7 @@ use finality_grandpa::{ Message::{Precommit, Prevote, PrimaryPropose}, }; use parity_scale_codec::{Decode, Encode}; -use sc_network::{NetworkService, ReputationChange}; +use sc_network::ReputationChange; use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO}; use sp_keystore::SyncCryptoStorePtr; @@ -58,6 +58,7 @@ use crate::{ use gossip::{ FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage, }; +use sc_network_common::service::{NetworkBlock, NetworkSyncForkRequest}; use sc_utils::mpsc::TracingUnboundedReceiver; use sp_finality_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber}; @@ -156,34 +157,26 @@ const TELEMETRY_VOTERS_LIMIT: usize = 10; /// /// Something that provides both the capabilities needed for the `gossip_network::Network` trait as /// well as the ability to set a fork sync request for a particular block. -pub trait Network: GossipNetwork + Clone + Send + 'static { - /// Notifies the sync service to try and sync the given block from the given - /// peers. - /// - /// If the given vector of peers is empty then the underlying implementation - /// should make a best effort to fetch the block from any peers it is - /// connected to (NOTE: this assumption will change in the future #3629). - fn set_sync_fork_request( - &self, - peers: Vec, - hash: Block::Hash, - number: NumberFor, - ); +pub trait Network: + NetworkSyncForkRequest> + + NetworkBlock> + + GossipNetwork + + Clone + + Send + + 'static +{ } -impl Network for Arc> +impl Network for T where - B: BlockT, - H: sc_network::ExHashT, + Block: BlockT, + T: NetworkSyncForkRequest> + + NetworkBlock> + + GossipNetwork + + Clone + + Send + + 'static, { - fn set_sync_fork_request( - &self, - peers: Vec, - hash: B::Hash, - number: NumberFor, - ) { - NetworkService::set_sync_fork_request(self, peers, hash, number) - } } /// Create a unique topic for a round and set-id combo. @@ -467,7 +460,7 @@ impl> NetworkBridge { hash: B::Hash, number: NumberFor, ) { - Network::set_sync_fork_request(&self.service, peers, hash, number) + self.service.set_sync_fork_request(peers, hash, number) } } diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index 0ec5092a2a047..59935cef6a095 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -25,7 +25,14 @@ use super::{ use crate::{communication::grandpa_protocol_name, environment::SharedVoterSetState}; use futures::prelude::*; use parity_scale_codec::Encode; -use sc_network::{config::Role, Event as NetworkEvent, ObservedRole, PeerId}; +use sc_network::{config::Role, Multiaddr, PeerId, ReputationChange}; +use sc_network_common::{ + protocol::event::{Event as NetworkEvent, ObservedRole}, + service::{ + NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, + NetworkSyncForkRequest, NotificationSender, NotificationSenderError, + }, +}; use sc_network_gossip::Validator; use sc_network_test::{Block, Hash}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -34,6 +41,7 @@ use sp_keyring::Ed25519Keyring; use sp_runtime::traits::NumberFor; use std::{ borrow::Cow, + collections::HashSet, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -42,8 +50,8 @@ use std::{ #[derive(Debug)] pub(crate) enum Event { EventStream(TracingUnboundedSender), - WriteNotification(sc_network::PeerId, Vec), - Report(sc_network::PeerId, sc_network::ReputationChange), + WriteNotification(PeerId, Vec), + Report(PeerId, ReputationChange), Announce(Hash), } @@ -52,49 +60,122 @@ pub(crate) struct TestNetwork { sender: TracingUnboundedSender, } -impl sc_network_gossip::Network for TestNetwork { - fn event_stream(&self) -> Pin + Send>> { - let (tx, rx) = tracing_unbounded("test"); - let _ = self.sender.unbounded_send(Event::EventStream(tx)); - Box::pin(rx) +impl NetworkPeers for TestNetwork { + fn set_authorized_peers(&self, _peers: HashSet) { + unimplemented!(); + } + + fn set_authorized_only(&self, _reserved_only: bool) { + unimplemented!(); + } + + fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) { + unimplemented!(); } - fn report_peer(&self, who: sc_network::PeerId, cost_benefit: sc_network::ReputationChange) { + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); } - fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {} + fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) {} + + fn accept_unreserved_peers(&self) { + unimplemented!(); + } + + fn deny_unreserved_peers(&self) { + unimplemented!(); + } + + fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { + unimplemented!(); + } + + fn remove_reserved_peer(&self, _peer_id: PeerId) { + unimplemented!(); + } + + fn set_reserved_peers( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn add_peers_to_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_peers_from_reserved_set(&self, _protocol: Cow<'static, str>, _peers: Vec) {} - fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {} + fn add_to_peers_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } - fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) {} + fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec) { + unimplemented!(); + } - fn write_notification(&self, who: PeerId, _: Cow<'static, str>, message: Vec) { - let _ = self.sender.unbounded_send(Event::WriteNotification(who, message)); + fn sync_num_connected(&self) -> usize { + unimplemented!(); } +} - fn announce(&self, block: Hash, _associated_data: Option>) { - let _ = self.sender.unbounded_send(Event::Announce(block)); +impl NetworkEventStream for TestNetwork { + fn event_stream( + &self, + _name: &'static str, + ) -> Pin + Send>> { + let (tx, rx) = tracing_unbounded("test"); + let _ = self.sender.unbounded_send(Event::EventStream(tx)); + Box::pin(rx) } } -impl super::Network for TestNetwork { - fn set_sync_fork_request( +impl NetworkNotification for TestNetwork { + fn write_notification(&self, target: PeerId, _protocol: Cow<'static, str>, message: Vec) { + let _ = self.sender.unbounded_send(Event::WriteNotification(target, message)); + } + + fn notification_sender( &self, - _peers: Vec, - _hash: Hash, - _number: NumberFor, - ) { + _target: PeerId, + _protocol: Cow<'static, str>, + ) -> Result, NotificationSenderError> { + unimplemented!(); + } +} + +impl NetworkBlock> for TestNetwork { + fn announce_block(&self, hash: Hash, _data: Option>) { + let _ = self.sender.unbounded_send(Event::Announce(hash)); + } + + fn new_best_block_imported(&self, _hash: Hash, _number: NumberFor) { + unimplemented!(); } } +impl NetworkSyncForkRequest> for TestNetwork { + fn set_sync_fork_request(&self, _peers: Vec, _hash: Hash, _number: NumberFor) {} +} + impl sc_network_gossip::ValidatorContext for TestNetwork { fn broadcast_topic(&mut self, _: Hash, _: bool) {} fn broadcast_message(&mut self, _: Hash, _: Vec, _: bool) {} - fn send_message(&mut self, who: &sc_network::PeerId, data: Vec) { - >::write_notification( + fn send_message(&mut self, who: &PeerId, data: Vec) { + ::write_notification( self, who.clone(), grandpa_protocol_name::NAME.into(), @@ -102,7 +183,7 @@ impl sc_network_gossip::ValidatorContext for TestNetwork { ); } - fn send_topic(&mut self, _: &sc_network::PeerId, _: Hash, _: bool) {} + fn send_topic(&mut self, _: &PeerId, _: Hash, _: bool) {} } pub(crate) struct Tester { @@ -207,8 +288,8 @@ struct NoopContext; impl sc_network_gossip::ValidatorContext for NoopContext { fn broadcast_topic(&mut self, _: Hash, _: bool) {} fn broadcast_message(&mut self, _: Hash, _: Vec, _: bool) {} - fn send_message(&mut self, _: &sc_network::PeerId, _: Vec) {} - fn send_topic(&mut self, _: &sc_network::PeerId, _: Hash, _: bool) {} + fn send_message(&mut self, _: &PeerId, _: Vec) {} + fn send_topic(&mut self, _: &PeerId, _: Hash, _: bool) {} } #[test] @@ -252,7 +333,7 @@ fn good_commit_leads_to_relay() { }) .encode(); - let id = sc_network::PeerId::random(); + let id = PeerId::random(); let global_topic = super::global_topic::(set_id); let test = make_test_network() @@ -301,7 +382,7 @@ fn good_commit_leads_to_relay() { }); // Add a random peer which will be the recipient of this message - let receiver_id = sc_network::PeerId::random(); + let receiver_id = PeerId::random(); let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: receiver_id.clone(), protocol: grandpa_protocol_name::NAME.into(), @@ -403,7 +484,7 @@ fn bad_commit_leads_to_report() { }) .encode(); - let id = sc_network::PeerId::random(); + let id = PeerId::random(); let global_topic = super::global_topic::(set_id); let test = make_test_network() @@ -484,7 +565,7 @@ fn bad_commit_leads_to_report() { #[test] fn peer_with_higher_view_leads_to_catch_up_request() { - let id = sc_network::PeerId::random(); + let id = PeerId::random(); let (tester, mut net) = make_test_network(); let test = tester diff --git a/client/informant/Cargo.toml b/client/informant/Cargo.toml index 528365d62c18b..b3ac5d892fd58 100644 --- a/client/informant/Cargo.toml +++ b/client/informant/Cargo.toml @@ -19,7 +19,7 @@ futures-timer = "3.0.1" log = "0.4.17" parity-util-mem = { version = "0.11.0", default-features = false, features = ["primitive-types"] } sc-client-api = { version = "4.0.0-dev", path = "../api" } -sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-transaction-pool-api = { version = "4.0.0-dev", path = "../transaction-pool/api" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } diff --git a/client/informant/src/display.rs b/client/informant/src/display.rs index 446ddf47b4cab..0441011b0ec42 100644 --- a/client/informant/src/display.rs +++ b/client/informant/src/display.rs @@ -20,7 +20,13 @@ use crate::OutputFormat; use ansi_term::Colour; use log::info; use sc_client_api::ClientInfo; -use sc_network::{NetworkStatus, SyncState, WarpSyncPhase, WarpSyncProgress}; +use sc_network_common::{ + service::NetworkStatus, + sync::{ + warp::{WarpSyncPhase, WarpSyncProgress}, + SyncState, + }, +}; use sp_runtime::traits::{Block as BlockT, CheckedDiv, NumberFor, Saturating, Zero}; use std::{fmt, time::Instant}; diff --git a/client/informant/src/lib.rs b/client/informant/src/lib.rs index 5dca77f1a7433..52f1c95fe0198 100644 --- a/client/informant/src/lib.rs +++ b/client/informant/src/lib.rs @@ -24,7 +24,7 @@ use futures_timer::Delay; use log::{debug, info, trace}; use parity_util_mem::MallocSizeOf; use sc_client_api::{BlockchainEvents, UsageProvider}; -use sc_network::NetworkService; +use sc_network_common::service::NetworkStatusProvider; use sc_transaction_pool_api::TransactionPool; use sp_blockchain::HeaderMetadata; use sp_runtime::traits::{Block as BlockT, Header}; @@ -53,12 +53,13 @@ impl Default for OutputFormat { } /// Builds the informant and returns a `Future` that drives the informant. -pub async fn build( +pub async fn build( client: Arc, - network: Arc::Hash>>, + network: N, pool: Arc

, format: OutputFormat, ) where + N: NetworkStatusProvider, C: UsageProvider + HeaderMetadata + BlockchainEvents, >::Error: Display, P: TransactionPool + MallocSizeOf, diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 144c5ad168996..185c37985b585 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -23,6 +23,7 @@ lru = "0.7.5" tracing = "0.1.29" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } [dev-dependencies] diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 2d086e89b4a10..8a6c3358e4409 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -21,7 +21,8 @@ use crate::{ Network, Validator, }; -use sc_network::{Event, ReputationChange}; +use sc_network::ReputationChange; +use sc_network_common::protocol::event::Event; use futures::{ channel::mpsc::{channel, Receiver, Sender}, @@ -85,7 +86,7 @@ impl GossipEngine { B: 'static, { let protocol = protocol.into(); - let network_event_stream = network.event_stream(); + let network_event_stream = network.event_stream("network-gossip"); GossipEngine { state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry), @@ -162,7 +163,7 @@ impl GossipEngine { /// Note: this method isn't strictly related to gossiping and should eventually be moved /// somewhere else. pub fn announce(&self, block: B::Hash, associated_data: Option>) { - self.network.announce(block, associated_data); + self.network.announce_block(block, associated_data); } } @@ -181,7 +182,10 @@ impl Future for GossipEngine { this.network.add_set_reserved(remote, this.protocol.clone()); }, Event::SyncDisconnected { remote } => { - this.network.remove_set_reserved(remote, this.protocol.clone()); + this.network.remove_peers_from_reserved_set( + this.protocol.clone(), + vec![remote], + ); }, Event::NotificationStreamOpened { remote, protocol, role, .. } => { if protocol != this.protocol { @@ -304,7 +308,7 @@ impl futures::future::FusedFuture for GossipEngine { #[cfg(test)] mod tests { use super::*; - use crate::{ValidationResult, ValidatorContext}; + use crate::{multiaddr::Multiaddr, ValidationResult, ValidatorContext}; use async_std::task::spawn; use futures::{ channel::mpsc::{unbounded, UnboundedSender}, @@ -312,10 +316,20 @@ mod tests { future::poll_fn, }; use quickcheck::{Arbitrary, Gen, QuickCheck}; - use sc_network::ObservedRole; - use sp_runtime::{testing::H256, traits::Block as BlockT}; + use sc_network_common::{ + protocol::event::ObservedRole, + service::{ + NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, + NotificationSender, NotificationSenderError, + }, + }; + use sp_runtime::{ + testing::H256, + traits::{Block as BlockT, NumberFor}, + }; use std::{ borrow::Cow, + collections::HashSet, sync::{Arc, Mutex}, }; use substrate_test_runtime_client::runtime::Block; @@ -330,29 +344,119 @@ mod tests { event_senders: Vec>, } - impl Network for TestNetwork { - fn event_stream(&self) -> Pin + Send>> { + impl NetworkPeers for TestNetwork { + fn set_authorized_peers(&self, _peers: HashSet) { + unimplemented!(); + } + + fn set_authorized_only(&self, _reserved_only: bool) { + unimplemented!(); + } + + fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) { + unimplemented!(); + } + + fn report_peer(&self, _who: PeerId, _cost_benefit: ReputationChange) {} + + fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) { + unimplemented!(); + } + + fn accept_unreserved_peers(&self) { + unimplemented!(); + } + + fn deny_unreserved_peers(&self) { + unimplemented!(); + } + + fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { + unimplemented!(); + } + + fn remove_reserved_peer(&self, _peer_id: PeerId) { + unimplemented!(); + } + + fn set_reserved_peers( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn add_peers_to_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_peers_from_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: Vec, + ) { + } + + fn add_to_peers_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec) { + unimplemented!(); + } + + fn sync_num_connected(&self) -> usize { + unimplemented!(); + } + } + + impl NetworkEventStream for TestNetwork { + fn event_stream(&self, _name: &'static str) -> Pin + Send>> { let (tx, rx) = unbounded(); self.inner.lock().unwrap().event_senders.push(tx); Box::pin(rx) } + } - fn report_peer(&self, _: PeerId, _: ReputationChange) {} - - fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) { + impl NetworkNotification for TestNetwork { + fn write_notification( + &self, + _target: PeerId, + _protocol: Cow<'static, str>, + _message: Vec, + ) { unimplemented!(); } - fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {} - - fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {} + fn notification_sender( + &self, + _target: PeerId, + _protocol: Cow<'static, str>, + ) -> Result, NotificationSenderError> { + unimplemented!(); + } + } - fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec) { + impl NetworkBlock<::Hash, NumberFor> for TestNetwork { + fn announce_block(&self, _hash: ::Hash, _data: Option>) { unimplemented!(); } - fn announce(&self, _: B::Hash, _: Option>) { + fn new_best_block_imported( + &self, + _hash: ::Hash, + _number: NumberFor, + ) { unimplemented!(); } } diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index 4b83708702466..b9dff0bcd4d00 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -41,9 +41,9 @@ //! messages. //! //! The [`GossipEngine`] will automatically use [`Network::add_set_reserved`] and -//! [`Network::remove_set_reserved`] to maintain a set of peers equal to the set of peers the -//! node is syncing from. See the documentation of `sc-network` for more explanations about the -//! concepts of peer sets. +//! [`NetworkPeers::remove_peers_from_reserved_set`] to maintain a set of peers equal to the set of +//! peers the node is syncing from. See the documentation of `sc-network` for more explanations +//! about the concepts of peer sets. //! //! # What is a validator? //! @@ -67,74 +67,35 @@ pub use self::{ validator::{DiscardAll, MessageIntent, ValidationResult, Validator, ValidatorContext}, }; -use futures::prelude::*; -use sc_network::{multiaddr, Event, ExHashT, NetworkService, PeerId, ReputationChange}; -use sp_runtime::traits::Block as BlockT; -use std::{borrow::Cow, iter, pin::Pin, sync::Arc}; +use sc_network::{multiaddr, PeerId}; +use sc_network_common::service::{ + NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, +}; +use sp_runtime::traits::{Block as BlockT, NumberFor}; +use std::{borrow::Cow, iter}; mod bridge; mod state_machine; mod validator; /// Abstraction over a network. -pub trait Network { - /// Returns a stream of events representing what happens on the network. - fn event_stream(&self) -> Pin + Send>>; - - /// Adjust the reputation of a node. - fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange); - - /// Adds the peer to the set of peers to be connected to with this protocol. - fn add_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>); - - /// Removes the peer from the set of peers to be connected to with this protocol. - fn remove_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>); - - /// Force-disconnect a peer. - fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>); - - /// Send a notification to a peer. - fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec); - - /// Notify everyone we're connected to that we have the given block. - /// - /// Note: this method isn't strictly related to gossiping and should eventually be moved - /// somewhere else. - fn announce(&self, block: B::Hash, associated_data: Option>); -} - -impl Network for Arc> { - fn event_stream(&self) -> Pin + Send>> { - Box::pin(NetworkService::event_stream(self, "network-gossip")) - } - - fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) { - NetworkService::report_peer(self, peer_id, reputation); - } - +pub trait Network: + NetworkPeers + NetworkEventStream + NetworkNotification + NetworkBlock> +{ fn add_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>) { let addr = iter::once(multiaddr::Protocol::P2p(who.into())).collect::(); - let result = - NetworkService::add_peers_to_reserved_set(self, protocol, iter::once(addr).collect()); + let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect()); if let Err(err) = result { log::error!(target: "gossip", "add_set_reserved failed: {}", err); } } +} - fn remove_set_reserved(&self, who: PeerId, protocol: Cow<'static, str>) { - NetworkService::remove_peers_from_reserved_set(self, protocol, iter::once(who).collect()); - } - - fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) { - NetworkService::disconnect_peer(self, who, protocol) - } - - fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec) { - NetworkService::write_notification(self, who, protocol, message) - } - - fn announce(&self, block: B::Hash, associated_data: Option>) { - NetworkService::announce_block(self, block, associated_data) - } +impl Network for T where + T: NetworkPeers + + NetworkEventStream + + NetworkNotification + + NetworkBlock> +{ } diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 8a016cbaab3da..4cc4e25529af4 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -22,7 +22,7 @@ use ahash::AHashSet; use libp2p::PeerId; use lru::LruCache; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; -use sc_network::ObservedRole; +use sc_network_common::protocol::event::ObservedRole; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use std::{borrow::Cow, collections::HashMap, iter, sync::Arc, time, time::Instant}; @@ -511,11 +511,23 @@ impl Metrics { #[cfg(test)] mod tests { use super::*; + use crate::multiaddr::Multiaddr; use futures::prelude::*; - use sc_network::{Event, ReputationChange}; - use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper, H256}; + use sc_network::ReputationChange; + use sc_network_common::{ + protocol::event::Event, + service::{ + NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, + NotificationSender, NotificationSenderError, + }, + }; + use sp_runtime::{ + testing::{Block as RawBlock, ExtrinsicWrapper, H256}, + traits::NumberFor, + }; use std::{ borrow::Cow, + collections::HashSet, pin::Pin, sync::{Arc, Mutex}, }; @@ -569,28 +581,118 @@ mod tests { peer_reports: Vec<(PeerId, ReputationChange)>, } - impl Network for NoOpNetwork { - fn event_stream(&self) -> Pin + Send>> { + impl NetworkPeers for NoOpNetwork { + fn set_authorized_peers(&self, _peers: HashSet) { + unimplemented!(); + } + + fn set_authorized_only(&self, _reserved_only: bool) { + unimplemented!(); + } + + fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) { + unimplemented!(); + } + + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { + self.inner.lock().unwrap().peer_reports.push((who, cost_benefit)); + } + + fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) { + unimplemented!(); + } + + fn accept_unreserved_peers(&self) { + unimplemented!(); + } + + fn deny_unreserved_peers(&self) { + unimplemented!(); + } + + fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { + unimplemented!(); + } + + fn remove_reserved_peer(&self, _peer_id: PeerId) { + unimplemented!(); + } + + fn set_reserved_peers( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { unimplemented!(); } - fn report_peer(&self, peer_id: PeerId, reputation_change: ReputationChange) { - self.inner.lock().unwrap().peer_reports.push((peer_id, reputation_change)); + fn add_peers_to_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_peers_from_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: Vec, + ) { } - fn disconnect_peer(&self, _: PeerId, _: Cow<'static, str>) { + fn add_to_peers_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { unimplemented!(); } - fn add_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {} + fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec) { + unimplemented!(); + } - fn remove_set_reserved(&self, _: PeerId, _: Cow<'static, str>) {} + fn sync_num_connected(&self) -> usize { + unimplemented!(); + } + } - fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec) { + impl NetworkEventStream for NoOpNetwork { + fn event_stream(&self, _name: &'static str) -> Pin + Send>> { unimplemented!(); } + } - fn announce(&self, _: B::Hash, _: Option>) { + impl NetworkNotification for NoOpNetwork { + fn write_notification( + &self, + _target: PeerId, + _protocol: Cow<'static, str>, + _message: Vec, + ) { + unimplemented!(); + } + + fn notification_sender( + &self, + _target: PeerId, + _protocol: Cow<'static, str>, + ) -> Result, NotificationSenderError> { + unimplemented!(); + } + } + + impl NetworkBlock<::Hash, NumberFor> for NoOpNetwork { + fn announce_block(&self, _hash: ::Hash, _data: Option>) { + unimplemented!(); + } + + fn new_best_block_imported( + &self, + _hash: ::Hash, + _number: NumberFor, + ) { unimplemented!(); } } diff --git a/client/network-gossip/src/validator.rs b/client/network-gossip/src/validator.rs index 7d60f7b31397f..a98c62ab5a9eb 100644 --- a/client/network-gossip/src/validator.rs +++ b/client/network-gossip/src/validator.rs @@ -16,7 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use sc_network::{ObservedRole, PeerId}; +use sc_network::PeerId; +use sc_network_common::protocol::event::ObservedRole; use sp_runtime::traits::Block as BlockT; /// Validates consensus messages. diff --git a/client/network/common/Cargo.toml b/client/network/common/Cargo.toml index b0e3a8fe42a83..1b10bd248292c 100644 --- a/client/network/common/Cargo.toml +++ b/client/network/common/Cargo.toml @@ -17,7 +17,9 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.10" [dependencies] +async-trait = "0.1.50" bitflags = "1.3.2" +bytes = "1" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive", ] } @@ -29,3 +31,4 @@ sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/finality-grandpa" } sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } +thiserror = "1.0" diff --git a/client/network/common/src/lib.rs b/client/network/common/src/lib.rs index 9fbedc542c123..3a30d24900199 100644 --- a/client/network/common/src/lib.rs +++ b/client/network/common/src/lib.rs @@ -20,5 +20,7 @@ pub mod config; pub mod message; +pub mod protocol; pub mod request_responses; +pub mod service; pub mod sync; diff --git a/client/network/common/src/protocol.rs b/client/network/common/src/protocol.rs new file mode 100644 index 0000000000000..0fd36cb511112 --- /dev/null +++ b/client/network/common/src/protocol.rs @@ -0,0 +1,19 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +pub mod event; diff --git a/client/network/src/protocol/event.rs b/client/network/common/src/protocol/event.rs similarity index 96% rename from client/network/src/protocol/event.rs rename to client/network/common/src/protocol/event.rs index 26c9544960605..c6fb4a2faf9f9 100644 --- a/client/network/src/protocol/event.rs +++ b/client/network/common/src/protocol/event.rs @@ -67,14 +67,14 @@ pub enum Event { remote: PeerId, /// The concerned protocol. Each protocol uses a different substream. /// This is always equal to the value of - /// [`crate::config::NonDefaultSetConfig::notifications_protocol`] of one of the + /// `sc_network::config::NonDefaultSetConfig::notifications_protocol` of one of the /// configured sets. protocol: Cow<'static, str>, /// If the negotiation didn't use the main name of the protocol (the one in /// `notifications_protocol`), then this field contains which name has actually been /// used. /// Always contains a value equal to the value in - /// [`crate::config::NonDefaultSetConfig::fallback_names`]. + /// `sc_network::config::NonDefaultSetConfig::fallback_names`. negotiated_fallback: Option>, /// Role of the remote. role: ObservedRole, diff --git a/client/network/common/src/request_responses.rs b/client/network/common/src/request_responses.rs index f409d1ac16d61..a216c9c2d254d 100644 --- a/client/network/common/src/request_responses.rs +++ b/client/network/common/src/request_responses.rs @@ -19,7 +19,7 @@ //! Collection of generic data structures for request-response protocols. use futures::channel::{mpsc, oneshot}; -use libp2p::PeerId; +use libp2p::{request_response::OutboundFailure, PeerId}; use sc_peerset::ReputationChange; use std::{borrow::Cow, time::Duration}; @@ -115,3 +115,40 @@ pub struct OutgoingResponse { /// > written to the buffer managed by the operating system. pub sent_feedback: Option>, } + +/// When sending a request, what to do on a disconnected recipient. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub enum IfDisconnected { + /// Try to connect to the peer. + TryConnect, + /// Just fail if the destination is not yet connected. + ImmediateError, +} + +/// Convenience functions for `IfDisconnected`. +impl IfDisconnected { + /// Shall we connect to a disconnected peer? + pub fn should_connect(self) -> bool { + match self { + Self::TryConnect => true, + Self::ImmediateError => false, + } + } +} + +/// Error in a request. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum RequestFailure { + #[error("We are not currently connected to the requested peer.")] + NotConnected, + #[error("Given protocol hasn't been registered.")] + UnknownProtocol, + #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")] + Refused, + #[error("The remote replied, but the local node is no longer interested in the response.")] + Obsolete, + /// Problem on the network. + #[error("Problem on the network: {0}")] + Network(OutboundFailure), +} diff --git a/client/network/common/src/service.rs b/client/network/common/src/service.rs new file mode 100644 index 0000000000000..0fff32b12d66c --- /dev/null +++ b/client/network/common/src/service.rs @@ -0,0 +1,660 @@ +// This file is part of Substrate. +// +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// If you read this, you are very thorough, congratulations. + +use crate::{ + protocol::event::Event, + request_responses::{IfDisconnected, RequestFailure}, + sync::{warp::WarpSyncProgress, StateDownloadProgress, SyncState}, +}; +use futures::{channel::oneshot, Stream}; +pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey}; +use libp2p::{Multiaddr, PeerId}; +use sc_peerset::ReputationChange; +pub use signature::Signature; +use sp_runtime::traits::{Block as BlockT, NumberFor}; +use std::{borrow::Cow, collections::HashSet, future::Future, pin::Pin, sync::Arc}; + +mod signature; + +/// Signer with network identity +pub trait NetworkSigner { + /// Signs the message with the `KeyPair` that defines the local [`PeerId`]. + fn sign_with_local_identity(&self, msg: impl AsRef<[u8]>) -> Result; +} + +impl NetworkSigner for Arc +where + T: ?Sized, + T: NetworkSigner, +{ + fn sign_with_local_identity(&self, msg: impl AsRef<[u8]>) -> Result { + T::sign_with_local_identity(self, msg) + } +} + +/// Provides access to the networking DHT. +pub trait NetworkDHTProvider { + /// Start getting a value from the DHT. + fn get_value(&self, key: &KademliaKey); + + /// Start putting a value in the DHT. + fn put_value(&self, key: KademliaKey, value: Vec); +} + +impl NetworkDHTProvider for Arc +where + T: ?Sized, + T: NetworkDHTProvider, +{ + fn get_value(&self, key: &KademliaKey) { + T::get_value(self, key) + } + + fn put_value(&self, key: KademliaKey, value: Vec) { + T::put_value(self, key, value) + } +} + +/// Provides an ability to set a fork sync request for a particular block. +pub trait NetworkSyncForkRequest { + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). + fn set_sync_fork_request(&self, peers: Vec, hash: BlockHash, number: BlockNumber); +} + +impl NetworkSyncForkRequest for Arc +where + T: ?Sized, + T: NetworkSyncForkRequest, +{ + fn set_sync_fork_request(&self, peers: Vec, hash: BlockHash, number: BlockNumber) { + T::set_sync_fork_request(self, peers, hash, number) + } +} + +/// Overview status of the network. +#[derive(Clone)] +pub struct NetworkStatus { + /// Current global sync state. + pub sync_state: SyncState, + /// Target sync block number. + pub best_seen_block: Option>, + /// Number of peers participating in syncing. + pub num_sync_peers: u32, + /// Total number of connected peers + pub num_connected_peers: usize, + /// Total number of active peers. + pub num_active_peers: usize, + /// The total number of bytes received. + pub total_bytes_inbound: u64, + /// The total number of bytes sent. + pub total_bytes_outbound: u64, + /// State sync in progress. + pub state_sync: Option, + /// Warp sync in progress. + pub warp_sync: Option>, +} + +/// Provides high-level status information about network. +#[async_trait::async_trait] +pub trait NetworkStatusProvider { + /// High-level network status information. + /// + /// Returns an error if the `NetworkWorker` is no longer running. + async fn status(&self) -> Result, ()>; +} + +// Manual implementation to avoid extra boxing here +impl NetworkStatusProvider for Arc +where + T: ?Sized, + T: NetworkStatusProvider, +{ + fn status<'life0, 'async_trait>( + &'life0 self, + ) -> Pin, ()>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + T::status(self) + } +} + +/// Provides low-level API for manipulating network peers. +pub trait NetworkPeers { + /// Set authorized peers. + /// + /// Need a better solution to manage authorized peers, but now just use reserved peers for + /// prototyping. + fn set_authorized_peers(&self, peers: HashSet); + + /// Set authorized_only flag. + /// + /// Need a better solution to decide authorized_only, but now just use reserved_only flag for + /// prototyping. + fn set_authorized_only(&self, reserved_only: bool); + + /// Adds an address known to a node. + fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr); + + /// Report a given peer as either beneficial (+) or costly (-) according to the + /// given scalar. + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange); + + /// Disconnect from a node as soon as possible. + /// + /// This triggers the same effects as if the connection had closed itself spontaneously. + /// + /// See also [`NetworkPeers::remove_from_peers_set`], which has the same effect but also + /// prevents the local node from re-establishing an outgoing substream to this peer until it + /// is added again. + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>); + + /// Connect to unreserved peers and allow unreserved peers to connect for syncing purposes. + fn accept_unreserved_peers(&self); + + /// Disconnect from unreserved peers and deny new unreserved peers to connect for syncing + /// purposes. + fn deny_unreserved_peers(&self); + + /// Adds a `PeerId` and its address as reserved. The string should encode the address + /// and peer ID of the remote node. + /// + /// Returns an `Err` if the given string is not a valid multiaddress + /// or contains an invalid peer ID (which includes the local peer ID). + fn add_reserved_peer(&self, peer: String) -> Result<(), String>; + + /// Removes a `PeerId` from the list of reserved peers. + fn remove_reserved_peer(&self, peer_id: PeerId); + + /// Sets the reserved set of a protocol to the given set of peers. + /// + /// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also + /// consist of only `/p2p/`. + /// + /// The node will start establishing/accepting connections and substreams to/from peers in this + /// set, if it doesn't have any substream open with them yet. + /// + /// Note however, if a call to this function results in less peers on the reserved set, they + /// will not necessarily get disconnected (depending on available free slots in the peer set). + /// If you want to also disconnect those removed peers, you will have to call + /// `remove_from_peers_set` on those in addition to updating the reserved set. You can omit + /// this step if the peer set is in reserved only mode. + /// + /// Returns an `Err` if one of the given addresses is invalid or contains an + /// invalid peer ID (which includes the local peer ID). + fn set_reserved_peers( + &self, + protocol: Cow<'static, str>, + peers: HashSet, + ) -> Result<(), String>; + + /// Add peers to a peer set. + /// + /// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also + /// consist of only `/p2p/`. + /// + /// Returns an `Err` if one of the given addresses is invalid or contains an + /// invalid peer ID (which includes the local peer ID). + fn add_peers_to_reserved_set( + &self, + protocol: Cow<'static, str>, + peers: HashSet, + ) -> Result<(), String>; + + /// Remove peers from a peer set. + fn remove_peers_from_reserved_set(&self, protocol: Cow<'static, str>, peers: Vec); + + /// Add a peer to a set of peers. + /// + /// If the set has slots available, it will try to open a substream with this peer. + /// + /// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also + /// consist of only `/p2p/`. + /// + /// Returns an `Err` if one of the given addresses is invalid or contains an + /// invalid peer ID (which includes the local peer ID). + fn add_to_peers_set( + &self, + protocol: Cow<'static, str>, + peers: HashSet, + ) -> Result<(), String>; + + /// Remove peers from a peer set. + /// + /// If we currently have an open substream with this peer, it will soon be closed. + fn remove_from_peers_set(&self, protocol: Cow<'static, str>, peers: Vec); + + /// Returns the number of peers in the sync peer set we're connected to. + fn sync_num_connected(&self) -> usize; +} + +// Manual implementation to avoid extra boxing here +impl NetworkPeers for Arc +where + T: ?Sized, + T: NetworkPeers, +{ + fn set_authorized_peers(&self, peers: HashSet) { + T::set_authorized_peers(self, peers) + } + + fn set_authorized_only(&self, reserved_only: bool) { + T::set_authorized_only(self, reserved_only) + } + + fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) { + T::add_known_address(self, peer_id, addr) + } + + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { + T::report_peer(self, who, cost_benefit) + } + + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) { + T::disconnect_peer(self, who, protocol) + } + + fn accept_unreserved_peers(&self) { + T::accept_unreserved_peers(self) + } + + fn deny_unreserved_peers(&self) { + T::deny_unreserved_peers(self) + } + + fn add_reserved_peer(&self, peer: String) -> Result<(), String> { + T::add_reserved_peer(self, peer) + } + + fn remove_reserved_peer(&self, peer_id: PeerId) { + T::remove_reserved_peer(self, peer_id) + } + + fn set_reserved_peers( + &self, + protocol: Cow<'static, str>, + peers: HashSet, + ) -> Result<(), String> { + T::set_reserved_peers(self, protocol, peers) + } + + fn add_peers_to_reserved_set( + &self, + protocol: Cow<'static, str>, + peers: HashSet, + ) -> Result<(), String> { + T::add_peers_to_reserved_set(self, protocol, peers) + } + + fn remove_peers_from_reserved_set(&self, protocol: Cow<'static, str>, peers: Vec) { + T::remove_peers_from_reserved_set(self, protocol, peers) + } + + fn add_to_peers_set( + &self, + protocol: Cow<'static, str>, + peers: HashSet, + ) -> Result<(), String> { + T::add_to_peers_set(self, protocol, peers) + } + + fn remove_from_peers_set(&self, protocol: Cow<'static, str>, peers: Vec) { + T::remove_from_peers_set(self, protocol, peers) + } + + fn sync_num_connected(&self) -> usize { + T::sync_num_connected(self) + } +} + +/// Provides access to network-level event stream. +pub trait NetworkEventStream { + /// Returns a stream containing the events that happen on the network. + /// + /// If this method is called multiple times, the events are duplicated. + /// + /// The stream never ends (unless the `NetworkWorker` gets shut down). + /// + /// The name passed is used to identify the channel in the Prometheus metrics. Note that the + /// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having + /// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory + fn event_stream(&self, name: &'static str) -> Pin + Send>>; +} + +impl NetworkEventStream for Arc +where + T: ?Sized, + T: NetworkEventStream, +{ + fn event_stream(&self, name: &'static str) -> Pin + Send>> { + T::event_stream(self, name) + } +} + +/// Trait for providing information about the local network state +pub trait NetworkStateInfo { + /// Returns the local external addresses. + fn external_addresses(&self) -> Vec; + + /// Returns the local Peer ID. + fn local_peer_id(&self) -> PeerId; +} + +impl NetworkStateInfo for Arc +where + T: ?Sized, + T: NetworkStateInfo, +{ + fn external_addresses(&self) -> Vec { + T::external_addresses(self) + } + + fn local_peer_id(&self) -> PeerId { + T::local_peer_id(self) + } +} + +/// Reserved slot in the notifications buffer, ready to accept data. +pub trait NotificationSenderReady { + /// Consumes this slots reservation and actually queues the notification. + /// + /// NOTE: Traits can't consume itself, but calling this method second time will return an error. + fn send(&mut self, notification: Vec) -> Result<(), NotificationSenderError>; +} + +/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol. +#[async_trait::async_trait] +pub trait NotificationSender { + /// Returns a future that resolves when the `NotificationSender` is ready to send a + /// notification. + async fn ready(&self) + -> Result, NotificationSenderError>; +} + +/// Error returned by [`NetworkNotification::notification_sender`]. +#[derive(Debug, thiserror::Error)] +pub enum NotificationSenderError { + /// The notification receiver has been closed, usually because the underlying connection + /// closed. + /// + /// Some of the notifications most recently sent may not have been received. However, + /// the peer may still be connected and a new `NotificationSender` for the same + /// protocol obtained from [`NetworkNotification::notification_sender`]. + #[error("The notification receiver has been closed")] + Closed, + /// Protocol name hasn't been registered. + #[error("Protocol name hasn't been registered")] + BadProtocol, +} + +/// Provides ability to send network notifications. +pub trait NetworkNotification { + /// Appends a notification to the buffer of pending outgoing notifications with the given peer. + /// Has no effect if the notifications channel with this protocol name is not open. + /// + /// If the buffer of pending outgoing notifications with that peer is full, the notification + /// is silently dropped and the connection to the remote will start being shut down. This + /// happens if you call this method at a higher rate than the rate at which the peer processes + /// these notifications, or if the available network bandwidth is too low. + /// + /// For this reason, this method is considered soft-deprecated. You are encouraged to use + /// [`NetworkNotification::notification_sender`] instead. + /// + /// > **Note**: The reason why this is a no-op in the situation where we have no channel is + /// > that we don't guarantee message delivery anyway. Networking issues can cause + /// > connections to drop at any time, and higher-level logic shouldn't differentiate + /// > between the remote voluntarily closing a substream or a network error + /// > preventing the message from being delivered. + /// + /// The protocol must have been registered with + /// `crate::config::NetworkConfiguration::notifications_protocols`. + fn write_notification(&self, target: PeerId, protocol: Cow<'static, str>, message: Vec); + + /// Obtains a [`NotificationSender`] for a connected peer, if it exists. + /// + /// A `NotificationSender` is scoped to a particular connection to the peer that holds + /// a receiver. With a `NotificationSender` at hand, sending a notification is done in two + /// steps: + /// + /// 1. [`NotificationSender::ready`] is used to wait for the sender to become ready + /// for another notification, yielding a [`NotificationSenderReady`] token. + /// 2. [`NotificationSenderReady::send`] enqueues the notification for sending. This operation + /// can only fail if the underlying notification substream or connection has suddenly closed. + /// + /// An error is returned by [`NotificationSenderReady::send`] if there exists no open + /// notifications substream with that combination of peer and protocol, or if the remote + /// has asked to close the notifications substream. If that happens, it is guaranteed that an + /// [`Event::NotificationStreamClosed`] has been generated on the stream returned by + /// [`NetworkEventStream::event_stream`]. + /// + /// If the remote requests to close the notifications substream, all notifications successfully + /// enqueued using [`NotificationSenderReady::send`] will finish being sent out before the + /// substream actually gets closed, but attempting to enqueue more notifications will now + /// return an error. It is however possible for the entire connection to be abruptly closed, + /// in which case enqueued notifications will be lost. + /// + /// The protocol must have been registered with + /// `crate::config::NetworkConfiguration::notifications_protocols`. + /// + /// # Usage + /// + /// This method returns a struct that allows waiting until there is space available in the + /// buffer of messages towards the given peer. If the peer processes notifications at a slower + /// rate than we send them, this buffer will quickly fill up. + /// + /// As such, you should never do something like this: + /// + /// ```ignore + /// // Do NOT do this + /// for peer in peers { + /// if let Ok(n) = network.notification_sender(peer, ...) { + /// if let Ok(s) = n.ready().await { + /// let _ = s.send(...); + /// } + /// } + /// } + /// ``` + /// + /// Doing so would slow down all peers to the rate of the slowest one. A malicious or + /// malfunctioning peer could intentionally process notifications at a very slow rate. + /// + /// Instead, you are encouraged to maintain your own buffer of notifications on top of the one + /// maintained by `sc-network`, and use `notification_sender` to progressively send out + /// elements from your buffer. If this additional buffer is full (which will happen at some + /// point if the peer is too slow to process notifications), appropriate measures can be taken, + /// such as removing non-critical notifications from the buffer or disconnecting the peer + /// using [`NetworkPeers::disconnect_peer`]. + /// + /// + /// Notifications Per-peer buffer + /// broadcast +-------> of notifications +--> `notification_sender` +--> Internet + /// ^ (not covered by + /// | sc-network) + /// + + /// Notifications should be dropped + /// if buffer is full + /// + /// + /// See also the `sc-network-gossip` crate for a higher-level way to send notifications. + fn notification_sender( + &self, + target: PeerId, + protocol: Cow<'static, str>, + ) -> Result, NotificationSenderError>; +} + +impl NetworkNotification for Arc +where + T: ?Sized, + T: NetworkNotification, +{ + fn write_notification(&self, target: PeerId, protocol: Cow<'static, str>, message: Vec) { + T::write_notification(self, target, protocol, message) + } + + fn notification_sender( + &self, + target: PeerId, + protocol: Cow<'static, str>, + ) -> Result, NotificationSenderError> { + T::notification_sender(self, target, protocol) + } +} + +/// Provides ability to send network requests. +#[async_trait::async_trait] +pub trait NetworkRequest { + /// Sends a single targeted request to a specific peer. On success, returns the response of + /// the peer. + /// + /// Request-response protocols are a way to complement notifications protocols, but + /// notifications should remain the default ways of communicating information. For example, a + /// peer can announce something through a notification, after which the recipient can obtain + /// more information by performing a request. + /// As such, call this function with `IfDisconnected::ImmediateError` for `connect`. This way + /// you will get an error immediately for disconnected peers, instead of waiting for a + /// potentially very long connection attempt, which would suggest that something is wrong + /// anyway, as you are supposed to be connected because of the notification protocol. + /// + /// No limit or throttling of concurrent outbound requests per peer and protocol are enforced. + /// Such restrictions, if desired, need to be enforced at the call site(s). + /// + /// The protocol must have been registered through + /// `NetworkConfiguration::request_response_protocols`. + async fn request( + &self, + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + connect: IfDisconnected, + ) -> Result, RequestFailure>; + + /// Variation of `request` which starts a request whose response is delivered on a provided + /// channel. + /// + /// Instead of blocking and waiting for a reply, this function returns immediately, sending + /// responses via the passed in sender. This alternative API exists to make it easier to + /// integrate with message passing APIs. + /// + /// Keep in mind that the connected receiver might receive a `Canceled` event in case of a + /// closing connection. This is expected behaviour. With `request` you would get a + /// `RequestFailure::Network(OutboundFailure::ConnectionClosed)` in that case. + fn start_request( + &self, + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + tx: oneshot::Sender, RequestFailure>>, + connect: IfDisconnected, + ); +} + +// Manual implementation to avoid extra boxing here +impl NetworkRequest for Arc +where + T: ?Sized, + T: NetworkRequest, +{ + fn request<'life0, 'async_trait>( + &'life0 self, + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + connect: IfDisconnected, + ) -> Pin, RequestFailure>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + T::request(self, target, protocol, request, connect) + } + + fn start_request( + &self, + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + tx: oneshot::Sender, RequestFailure>>, + connect: IfDisconnected, + ) { + T::start_request(self, target, protocol, request, tx, connect) + } +} + +/// Provides ability to propagate transactions over the network. +pub trait NetworkTransaction { + /// You may call this when new transactions are imported by the transaction pool. + /// + /// All transactions will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration and propagated to peers. + fn trigger_repropagate(&self); + + /// You must call when new transaction is imported by the transaction pool. + /// + /// This transaction will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration and propagated to peers. + fn propagate_transaction(&self, hash: H); +} + +impl NetworkTransaction for Arc +where + T: ?Sized, + T: NetworkTransaction, +{ + fn trigger_repropagate(&self) { + T::trigger_repropagate(self) + } + + fn propagate_transaction(&self, hash: H) { + T::propagate_transaction(self, hash) + } +} + +/// Provides ability to announce blocks to the network. +pub trait NetworkBlock { + /// Make sure an important block is propagated to peers. + /// + /// In chain-based consensus, we often need to make sure non-best forks are + /// at least temporarily synced. This function forces such an announcement. + fn announce_block(&self, hash: BlockHash, data: Option>); + + /// Inform the network service about new best imported block. + fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber); +} + +impl NetworkBlock for Arc +where + T: ?Sized, + T: NetworkBlock, +{ + fn announce_block(&self, hash: BlockHash, data: Option>) { + T::announce_block(self, hash, data) + } + + fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber) { + T::new_best_block_imported(self, hash, number) + } +} diff --git a/client/network/src/service/signature.rs b/client/network/common/src/service/signature.rs similarity index 96% rename from client/network/src/service/signature.rs rename to client/network/common/src/service/signature.rs index d21d28a3007b5..602ef3d82979a 100644 --- a/client/network/src/service/signature.rs +++ b/client/network/common/src/service/signature.rs @@ -18,7 +18,10 @@ // // If you read this, you are very thorough, congratulations. -use super::*; +use libp2p::{ + identity::{error::SigningError, Keypair, PublicKey}, + PeerId, +}; /// A result of signing a message with a network identity. Since `PeerId` is potentially a hash of a /// `PublicKey`, you need to reveal the `PublicKey` next to the signature, so the verifier can check diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 515608df13d0f..4ff415788f4ea 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -21,7 +21,7 @@ use crate::{ discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, peer_info, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, - request_responses, DhtEvent, ObservedRole, + request_responses, }; use bytes::Bytes; @@ -41,7 +41,11 @@ use log::debug; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::import_queue::{IncomingBlock, Origin}; -use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig}; +use sc_network_common::{ + config::ProtocolId, + protocol::event::{DhtEvent, ObservedRole}, + request_responses::{IfDisconnected, ProtocolConfig, RequestFailure}, +}; use sc_peerset::PeersetHandle; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::BlockOrigin; @@ -57,9 +61,7 @@ use std::{ time::Duration, }; -pub use crate::request_responses::{ - IfDisconnected, InboundFailure, OutboundFailure, RequestFailure, RequestId, ResponseFailure, -}; +pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure}; /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 83bc1075b8bad..b1d70c28289bd 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -262,22 +262,26 @@ pub mod transactions; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; -pub use protocol::{ - event::{DhtEvent, Event, ObservedRole}, - PeerInfo, -}; -pub use sc_network_common::sync::{ - warp::{WarpSyncPhase, WarpSyncProgress}, - StateDownloadProgress, SyncState, +pub use protocol::PeerInfo; +pub use sc_network_common::{ + protocol::event::{DhtEvent, Event, ObservedRole}, + request_responses::{IfDisconnected, RequestFailure}, + service::{ + KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkRequest, NetworkSigner, + NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, + NetworkTransaction, Signature, SigningError, + }, + sync::{ + warp::{WarpSyncPhase, WarpSyncProgress}, + StateDownloadProgress, SyncState, + }, }; pub use service::{ - DecodingError, IfDisconnected, KademliaKey, Keypair, NetworkService, NetworkWorker, - NotificationSender, NotificationSenderReady, OutboundFailure, PublicKey, RequestFailure, - Signature, SigningError, + DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, + NotificationSenderReady, OutboundFailure, PublicKey, }; pub use sc_peerset::ReputationChange; -use sp_runtime::traits::{Block as BlockT, NumberFor}; /// The maximum allowed number of established connections per peer. /// @@ -296,35 +300,3 @@ pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync impl ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} - -/// Trait for providing information about the local network state -pub trait NetworkStateInfo { - /// Returns the local external addresses. - fn external_addresses(&self) -> Vec; - - /// Returns the local Peer ID. - fn local_peer_id(&self) -> PeerId; -} - -/// Overview status of the network. -#[derive(Clone)] -pub struct NetworkStatus { - /// Current global sync state. - pub sync_state: SyncState, - /// Target sync block number. - pub best_seen_block: Option>, - /// Number of peers participating in syncing. - pub num_sync_peers: u32, - /// Total number of connected peers - pub num_connected_peers: usize, - /// Total number of active peers. - pub num_active_peers: usize, - /// The total number of bytes received. - pub total_bytes_inbound: u64, - /// The total number of bytes sent. - pub total_bytes_outbound: u64, - /// State sync in progress. - pub state_sync: Option, - /// Warp sync in progress. - pub warp_sync: Option>, -} diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index c51667017d15c..351e7d207ad1e 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -18,7 +18,6 @@ use crate::{ config, error, - request_responses::RequestFailure, utils::{interval, LruHashSet}, }; @@ -45,6 +44,7 @@ use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin}; use sc_network_common::{ config::ProtocolId, + request_responses::RequestFailure, sync::{ message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState, @@ -76,7 +76,6 @@ use std::{ mod notifications; -pub mod event; pub mod message; pub use notifications::{NotificationsSink, NotifsHandlerError, Ready}; diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 0d8c6c33b1c95..9eab85a4c1ce1 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -50,7 +50,9 @@ use libp2p::{ NetworkBehaviourAction, PollParameters, }, }; -use sc_network_common::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; +use sc_network_common::request_responses::{ + IfDisconnected, IncomingRequest, OutgoingResponse, ProtocolConfig, RequestFailure, +}; use std::{ borrow::Cow, collections::{hash_map::Entry, HashMap}, @@ -118,26 +120,6 @@ impl From<(Cow<'static, str>, RequestId)> for ProtocolRequestId { } } -/// When sending a request, what to do on a disconnected recipient. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub enum IfDisconnected { - /// Try to connect to the peer. - TryConnect, - /// Just fail if the destination is not yet connected. - ImmediateError, -} - -/// Convenience functions for `IfDisconnected`. -impl IfDisconnected { - /// Shall we connect to a disconnected peer? - pub fn should_connect(self) -> bool { - match self { - Self::TryConnect => true, - Self::ImmediateError => false, - } - } -} - /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. @@ -787,23 +769,6 @@ pub enum RegisterError { DuplicateProtocol(Cow<'static, str>), } -/// Error in a request. -#[derive(Debug, thiserror::Error)] -#[allow(missing_docs)] -pub enum RequestFailure { - #[error("We are not currently connected to the requested peer.")] - NotConnected, - #[error("Given protocol hasn't been registered.")] - UnknownProtocol, - #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")] - Refused, - #[error("The remote replied, but the local node is no longer interested in the response.")] - Obsolete, - /// Problem on the network. - #[error("Problem on the network: {0}")] - Network(OutboundFailure), -} - /// Error when processing a request sent by a remote. #[derive(Debug, thiserror::Error)] pub enum ResponseFailure { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 1210b0ca64224..bf15a9250d82c 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -37,16 +37,17 @@ use crate::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, protocol::{ - self, event::Event, message::generic::Roles, NotificationsSink, NotifsHandlerError, - PeerInfo, Protocol, Ready, + self, message::generic::Roles, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, + Ready, }, - transactions, transport, DhtEvent, ExHashT, NetworkStateInfo, NetworkStatus, ReputationChange, + transactions, transport, ExHashT, ReputationChange, }; use codec::Encode as _; use futures::{channel::oneshot, prelude::*}; use libp2p::{ core::{either::EitherError, upgrade, ConnectedPoint, Executor}, + kad::record::Key as KademliaKey, multiaddr, ping::Failure as PingFailure, swarm::{ @@ -60,7 +61,17 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; -use sc_network_common::sync::{SyncState, SyncStatus}; +use sc_network_common::{ + protocol::event::{DhtEvent, Event}, + request_responses::{IfDisconnected, RequestFailure}, + service::{ + NetworkDHTProvider, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkSigner, + NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, + NotificationSender as NotificationSenderT, NotificationSenderError, + NotificationSenderReady as NotificationSenderReadyT, Signature, SigningError, + }, + sync::{SyncState, SyncStatus}, +}; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::{HeaderBackend, HeaderMetadata}; @@ -81,24 +92,15 @@ use std::{ task::Poll, }; -pub use behaviour::{ - IfDisconnected, InboundFailure, OutboundFailure, RequestFailure, ResponseFailure, -}; +pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; mod metrics; mod out_events; -mod signature; #[cfg(test)] mod tests; -pub use libp2p::{ - identity::{ - error::{DecodingError, SigningError}, - Keypair, PublicKey, - }, - kad::record::Key as KademliaKey, -}; -pub use signature::Signature; +pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; +use sc_network_common::service::{NetworkBlock, NetworkRequest, NetworkTransaction}; /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService { @@ -723,289 +725,6 @@ where } impl NetworkService { - /// Returns the local `PeerId`. - pub fn local_peer_id(&self) -> &PeerId { - &self.local_peer_id - } - - /// Signs the message with the `KeyPair` that defined the local `PeerId`. - pub fn sign_with_local_identity( - &self, - msg: impl AsRef<[u8]>, - ) -> Result { - Signature::sign_message(msg.as_ref(), &self.local_identity) - } - - /// Set authorized peers. - /// - /// Need a better solution to manage authorized peers, but now just use reserved peers for - /// prototyping. - pub fn set_authorized_peers(&self, peers: HashSet) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReserved(peers)); - } - - /// Set authorized_only flag. - /// - /// Need a better solution to decide authorized_only, but now just use reserved_only flag for - /// prototyping. - pub fn set_authorized_only(&self, reserved_only: bool) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); - } - - /// Adds an address known to a node. - pub fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); - } - - /// Appends a notification to the buffer of pending outgoing notifications with the given peer. - /// Has no effect if the notifications channel with this protocol name is not open. - /// - /// If the buffer of pending outgoing notifications with that peer is full, the notification - /// is silently dropped and the connection to the remote will start being shut down. This - /// happens if you call this method at a higher rate than the rate at which the peer processes - /// these notifications, or if the available network bandwidth is too low. - /// - /// For this reason, this method is considered soft-deprecated. You are encouraged to use - /// [`NetworkService::notification_sender`] instead. - /// - /// > **Note**: The reason why this is a no-op in the situation where we have no channel is - /// > that we don't guarantee message delivery anyway. Networking issues can cause - /// > connections to drop at any time, and higher-level logic shouldn't differentiate - /// > between the remote voluntarily closing a substream or a network error - /// > preventing the message from being delivered. - /// - /// The protocol must have been registered with - /// `crate::config::NetworkConfiguration::notifications_protocols`. - pub fn write_notification( - &self, - target: PeerId, - protocol: Cow<'static, str>, - message: Vec, - ) { - // We clone the `NotificationsSink` in order to be able to unlock the network-wide - // `peers_notifications_sinks` mutex as soon as possible. - let sink = { - let peers_notifications_sinks = self.peers_notifications_sinks.lock(); - if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) { - sink.clone() - } else { - // Notification silently discarded, as documented. - debug!( - target: "sub-libp2p", - "Attempted to send notification on missing or closed substream: {}, {:?}", - target, protocol, - ); - return - } - }; - - if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() { - notifications_sizes_metric - .with_label_values(&["out", &protocol]) - .observe(message.len() as f64); - } - - // Sending is communicated to the `NotificationsSink`. - trace!( - target: "sub-libp2p", - "External API => Notification({:?}, {:?}, {} bytes)", - target, protocol, message.len() - ); - trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target); - sink.send_sync_notification(message); - } - - /// Obtains a [`NotificationSender`] for a connected peer, if it exists. - /// - /// A `NotificationSender` is scoped to a particular connection to the peer that holds - /// a receiver. With a `NotificationSender` at hand, sending a notification is done in two - /// steps: - /// - /// 1. [`NotificationSender::ready`] is used to wait for the sender to become ready - /// for another notification, yielding a [`NotificationSenderReady`] token. - /// 2. [`NotificationSenderReady::send`] enqueues the notification for sending. This operation - /// can only fail if the underlying notification substream or connection has suddenly closed. - /// - /// An error is returned by [`NotificationSenderReady::send`] if there exists no open - /// notifications substream with that combination of peer and protocol, or if the remote - /// has asked to close the notifications substream. If that happens, it is guaranteed that an - /// [`Event::NotificationStreamClosed`] has been generated on the stream returned by - /// [`NetworkService::event_stream`]. - /// - /// If the remote requests to close the notifications substream, all notifications successfully - /// enqueued using [`NotificationSenderReady::send`] will finish being sent out before the - /// substream actually gets closed, but attempting to enqueue more notifications will now - /// return an error. It is however possible for the entire connection to be abruptly closed, - /// in which case enqueued notifications will be lost. - /// - /// The protocol must have been registered with - /// `crate::config::NetworkConfiguration::notifications_protocols`. - /// - /// # Usage - /// - /// This method returns a struct that allows waiting until there is space available in the - /// buffer of messages towards the given peer. If the peer processes notifications at a slower - /// rate than we send them, this buffer will quickly fill up. - /// - /// As such, you should never do something like this: - /// - /// ```ignore - /// // Do NOT do this - /// for peer in peers { - /// if let Ok(n) = network.notification_sender(peer, ...) { - /// if let Ok(s) = n.ready().await { - /// let _ = s.send(...); - /// } - /// } - /// } - /// ``` - /// - /// Doing so would slow down all peers to the rate of the slowest one. A malicious or - /// malfunctioning peer could intentionally process notifications at a very slow rate. - /// - /// Instead, you are encouraged to maintain your own buffer of notifications on top of the one - /// maintained by `sc-network`, and use `notification_sender` to progressively send out - /// elements from your buffer. If this additional buffer is full (which will happen at some - /// point if the peer is too slow to process notifications), appropriate measures can be taken, - /// such as removing non-critical notifications from the buffer or disconnecting the peer - /// using [`NetworkService::disconnect_peer`]. - /// - /// - /// Notifications Per-peer buffer - /// broadcast +-------> of notifications +--> `notification_sender` +--> Internet - /// ^ (not covered by - /// | sc-network) - /// + - /// Notifications should be dropped - /// if buffer is full - /// - /// - /// See also the `sc-network-gossip` crate for a higher-level way to send notifications. - pub fn notification_sender( - &self, - target: PeerId, - protocol: Cow<'static, str>, - ) -> Result { - // We clone the `NotificationsSink` in order to be able to unlock the network-wide - // `peers_notifications_sinks` mutex as soon as possible. - let sink = { - let peers_notifications_sinks = self.peers_notifications_sinks.lock(); - if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) { - sink.clone() - } else { - return Err(NotificationSenderError::Closed) - } - }; - - let notification_size_metric = self - .notifications_sizes_metric - .as_ref() - .map(|histogram| histogram.with_label_values(&["out", &protocol])); - - Ok(NotificationSender { sink, protocol_name: protocol, notification_size_metric }) - } - - /// Returns a stream containing the events that happen on the network. - /// - /// If this method is called multiple times, the events are duplicated. - /// - /// The stream never ends (unless the `NetworkWorker` gets shut down). - /// - /// The name passed is used to identify the channel in the Prometheus metrics. Note that the - /// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having - /// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory - pub fn event_stream(&self, name: &'static str) -> impl Stream { - let (tx, rx) = out_events::channel(name); - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); - rx - } - - /// Sends a single targeted request to a specific peer. On success, returns the response of - /// the peer. - /// - /// Request-response protocols are a way to complement notifications protocols, but - /// notifications should remain the default ways of communicating information. For example, a - /// peer can announce something through a notification, after which the recipient can obtain - /// more information by performing a request. - /// As such, call this function with `IfDisconnected::ImmediateError` for `connect`. This way - /// you will get an error immediately for disconnected peers, instead of waiting for a - /// potentially very long connection attempt, which would suggest that something is wrong - /// anyway, as you are supposed to be connected because of the notification protocol. - /// - /// No limit or throttling of concurrent outbound requests per peer and protocol are enforced. - /// Such restrictions, if desired, need to be enforced at the call site(s). - /// - /// The protocol must have been registered through - /// [`NetworkConfiguration::request_response_protocols`]( - /// crate::config::NetworkConfiguration::request_response_protocols). - pub async fn request( - &self, - target: PeerId, - protocol: impl Into>, - request: Vec, - connect: IfDisconnected, - ) -> Result, RequestFailure> { - let (tx, rx) = oneshot::channel(); - - self.start_request(target, protocol, request, tx, connect); - - match rx.await { - Ok(v) => v, - // The channel can only be closed if the network worker no longer exists. If the - // network worker no longer exists, then all connections to `target` are necessarily - // closed, and we legitimately report this situation as a "ConnectionClosed". - Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)), - } - } - - /// Variation of `request` which starts a request whose response is delivered on a provided - /// channel. - /// - /// Instead of blocking and waiting for a reply, this function returns immediately, sending - /// responses via the passed in sender. This alternative API exists to make it easier to - /// integrate with message passing APIs. - /// - /// Keep in mind that the connected receiver might receive a `Canceled` event in case of a - /// closing connection. This is expected behaviour. With `request` you would get a - /// `RequestFailure::Network(OutboundFailure::ConnectionClosed)` in that case. - pub fn start_request( - &self, - target: PeerId, - protocol: impl Into>, - request: Vec, - tx: oneshot::Sender, RequestFailure>>, - connect: IfDisconnected, - ) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { - target, - protocol: protocol.into(), - request, - pending_response: tx, - connect, - }); - } - - /// High-level network status information. - /// - /// Returns an error if the `NetworkWorker` is no longer running. - pub async fn status(&self) -> Result, ()> { - let (tx, rx) = oneshot::channel(); - - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx }); - - match rx.await { - Ok(v) => v.map_err(|_| ()), - // The channel can only be closed if the network worker no longer exists. - Err(_) => Err(()), - } - } - /// Get network state. /// /// **Note**: Use this only for debugging. This API is unstable. There are warnings literally @@ -1026,74 +745,97 @@ impl NetworkService { } } - /// You may call this when new transactions are imported by the transaction pool. - /// - /// All transactions will be fetched from the `TransactionPool` that was passed at - /// initialization as part of the configuration and propagated to peers. - pub fn trigger_repropagate(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions); - } - - /// You must call when new transaction is imported by the transaction pool. + /// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates. /// - /// This transaction will be fetched from the `TransactionPool` that was passed at - /// initialization as part of the configuration and propagated to peers. - pub fn propagate_transaction(&self, hash: H) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash)); - } + /// Returns an `Err` if one of the given addresses is invalid or contains an + /// invalid peer ID (which includes the local peer ID). + fn split_multiaddr_and_peer_id( + &self, + peers: HashSet, + ) -> Result, String> { + peers + .into_iter() + .map(|mut addr| { + let peer = match addr.pop() { + Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) + .map_err(|_| "Invalid PeerId format".to_string())?, + _ => return Err("Missing PeerId from address".to_string()), + }; - /// Make sure an important block is propagated to peers. - /// - /// In chain-based consensus, we often need to make sure non-best forks are - /// at least temporarily synced. This function forces such an announcement. - pub fn announce_block(&self, hash: B::Hash, data: Option>) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); + // Make sure the local peer ID is never added to the PSM + // or added as a "known address", even if given. + if peer == self.local_peer_id { + Err("Local peer ID in peer set.".to_string()) + } else { + Ok((peer, addr)) + } + }) + .collect::, String>>() } +} - /// Report a given peer as either beneficial (+) or costly (-) according to the - /// given scalar. - pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { - self.peerset.report_peer(who, cost_benefit); +impl sp_consensus::SyncOracle for NetworkService { + fn is_major_syncing(&self) -> bool { + self.is_major_syncing.load(Ordering::Relaxed) } - /// Disconnect from a node as soon as possible. - /// - /// This triggers the same effects as if the connection had closed itself spontaneously. - /// - /// See also [`NetworkService::remove_from_peers_set`], which has the same effect but also - /// prevents the local node from re-establishing an outgoing substream to this peer until it - /// is added again. - pub fn disconnect_peer(&self, who: PeerId, protocol: impl Into>) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who, protocol.into())); + fn is_offline(&self) -> bool { + self.num_connected.load(Ordering::Relaxed) == 0 } +} +impl sc_consensus::JustificationSyncLink for NetworkService { /// Request a justification for the given block from the network. /// /// On success, the justification will be passed to the import queue that was part at /// initialization as part of the configuration. - pub fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + fn request_justification(&self, hash: &B::Hash, number: NumberFor) { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::RequestJustification(*hash, number)); } - /// Clear all pending justification requests. - pub fn clear_justification_requests(&self) { + fn clear_justification_requests(&self) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::ClearJustificationRequests); } +} - /// Are we in the process of downloading the chain? - pub fn is_major_syncing(&self) -> bool { - self.is_major_syncing.load(Ordering::Relaxed) +impl NetworkStateInfo for NetworkService +where + B: sp_runtime::traits::Block, + H: ExHashT, +{ + /// Returns the local external addresses. + fn external_addresses(&self) -> Vec { + self.external_addresses.lock().clone() + } + + /// Returns the local Peer ID. + fn local_peer_id(&self) -> PeerId { + self.local_peer_id } +} +impl NetworkSigner for NetworkService +where + B: sp_runtime::traits::Block, + H: ExHashT, +{ + fn sign_with_local_identity(&self, msg: impl AsRef<[u8]>) -> Result { + Signature::sign_message(msg.as_ref(), &self.local_identity) + } +} + +impl NetworkDHTProvider for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ /// Start getting a value from the DHT. /// /// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an /// item on the [`NetworkWorker`] stream. - pub fn get_value(&self, key: &KademliaKey) { + fn get_value(&self, key: &KademliaKey) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone())); } @@ -1101,27 +843,86 @@ impl NetworkService { /// /// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an /// item on the [`NetworkWorker`] stream. - pub fn put_value(&self, key: KademliaKey, value: Vec) { + fn put_value(&self, key: KademliaKey, value: Vec) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); } +} + +impl NetworkSyncForkRequest> for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + /// Configure an explicit fork sync request. + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// `set_sync_fork_request` should only be used if external code detects that there's + /// a stale fork missing. + /// Passing empty `peers` set effectively removes the sync request. + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number)); + } +} - /// Connect to unreserved peers and allow unreserved peers to connect for syncing purposes. - pub fn accept_unreserved_peers(&self) { +#[async_trait::async_trait] +impl NetworkStatusProvider for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + async fn status(&self) -> Result, ()> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx }); + + match rx.await { + Ok(v) => v.map_err(|_| ()), + // The channel can only be closed if the network worker no longer exists. + Err(_) => Err(()), + } + } +} + +impl NetworkPeers for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + fn set_authorized_peers(&self, peers: HashSet) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReserved(peers)); + } + + fn set_authorized_only(&self, reserved_only: bool) { + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::SetReservedOnly(reserved_only)); + } + + fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) { + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); + } + + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { + self.peerset.report_peer(who, cost_benefit); + } + + fn disconnect_peer(&self, who: PeerId, protocol: Cow<'static, str>) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who, protocol)); + } + + fn accept_unreserved_peers(&self) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(false)); } - /// Disconnect from unreserved peers and deny new unreserved peers to connect for syncing - /// purposes. - pub fn deny_unreserved_peers(&self) { + fn deny_unreserved_peers(&self) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SetReservedOnly(true)); } - /// Adds a `PeerId` and its address as reserved. The string should encode the address - /// and peer ID of the remote node. - /// - /// Returns an `Err` if the given string is not a valid multiaddress - /// or contains an invalid peer ID (which includes the local peer ID). - pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> { + fn add_reserved_peer(&self, peer: String) -> Result<(), String> { let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?; // Make sure the local peer ID is never added to the PSM. if peer_id == self.local_peer_id { @@ -1135,28 +936,11 @@ impl NetworkService { Ok(()) } - /// Removes a `PeerId` from the list of reserved peers. - pub fn remove_reserved_peer(&self, peer_id: PeerId) { + fn remove_reserved_peer(&self, peer_id: PeerId) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RemoveReserved(peer_id)); } - /// Sets the reserved set of a protocol to the given set of peers. - /// - /// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also - /// consist of only `/p2p/`. - /// - /// The node will start establishing/accepting connections and substreams to/from peers in this - /// set, if it doesn't have any substream open with them yet. - /// - /// Note however, if a call to this function results in less peers on the reserved set, they - /// will not necessarily get disconnected (depending on available free slots in the peer set). - /// If you want to also disconnect those removed peers, you will have to call - /// `remove_from_peers_set` on those in addition to updating the reserved set. You can omit - /// this step if the peer set is in reserved only mode. - /// - /// Returns an `Err` if one of the given addresses is invalid or contains an - /// invalid peer ID (which includes the local peer ID). - pub fn set_reserved_peers( + fn set_reserved_peers( &self, protocol: Cow<'static, str>, peers: HashSet, @@ -1187,14 +971,7 @@ impl NetworkService { Ok(()) } - /// Add peers to a peer set. - /// - /// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also - /// consist of only `/p2p/`. - /// - /// Returns an `Err` if one of the given addresses is invalid or contains an - /// invalid peer ID (which includes the local peer ID). - pub fn add_peers_to_reserved_set( + fn add_peers_to_reserved_set( &self, protocol: Cow<'static, str>, peers: HashSet, @@ -1220,8 +997,7 @@ impl NetworkService { Ok(()) } - /// Remove peers from a peer set. - pub fn remove_peers_from_reserved_set(&self, protocol: Cow<'static, str>, peers: Vec) { + fn remove_peers_from_reserved_set(&self, protocol: Cow<'static, str>, peers: Vec) { for peer_id in peers.into_iter() { let _ = self .to_worker @@ -1229,26 +1005,7 @@ impl NetworkService { } } - /// Configure an explicit fork sync request. - /// Note that this function should not be used for recent blocks. - /// Sync should be able to download all the recent forks normally. - /// `set_sync_fork_request` should only be used if external code detects that there's - /// a stale fork missing. - /// Passing empty `peers` set effectively removes the sync request. - pub fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number)); - } - - /// Add a peer to a set of peers. - /// - /// If the set has slots available, it will try to open a substream with this peer. - /// - /// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also - /// consist of only `/p2p/`. - /// - /// Returns an `Err` if one of the given addresses is invalid or contains an - /// invalid peer ID (which includes the local peer ID). - pub fn add_to_peers_set( + fn add_to_peers_set( &self, protocol: Cow<'static, str>, peers: HashSet, @@ -1274,10 +1031,7 @@ impl NetworkService { Ok(()) } - /// Remove peers from a peer set. - /// - /// If we currently have an open substream with this peer, it will soon be closed. - pub fn remove_from_peers_set(&self, protocol: Cow<'static, str>, peers: Vec) { + fn remove_from_peers_set(&self, protocol: Cow<'static, str>, peers: Vec) { for peer_id in peers.into_iter() { let _ = self .to_worker @@ -1285,90 +1039,158 @@ impl NetworkService { } } - /// Returns the number of peers we're connected to. - pub fn num_connected(&self) -> usize { + fn sync_num_connected(&self) -> usize { self.num_connected.load(Ordering::Relaxed) } +} - /// Inform the network service about new best imported block. - pub fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor) { - let _ = self - .to_worker - .unbounded_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number)); +impl NetworkEventStream for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + fn event_stream(&self, name: &'static str) -> Pin + Send>> { + let (tx, rx) = out_events::channel(name); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); + Box::pin(rx) } +} - /// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates. - /// - /// Returns an `Err` if one of the given addresses is invalid or contains an - /// invalid peer ID (which includes the local peer ID). - fn split_multiaddr_and_peer_id( - &self, - peers: HashSet, - ) -> Result, String> { - peers - .into_iter() - .map(|mut addr| { - let peer = match addr.pop() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) - .map_err(|_| "Invalid PeerId format".to_string())?, - _ => return Err("Missing PeerId from address".to_string()), - }; +impl NetworkNotification for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + fn write_notification(&self, target: PeerId, protocol: Cow<'static, str>, message: Vec) { + // We clone the `NotificationsSink` in order to be able to unlock the network-wide + // `peers_notifications_sinks` mutex as soon as possible. + let sink = { + let peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) { + sink.clone() + } else { + // Notification silently discarded, as documented. + debug!( + target: "sub-libp2p", + "Attempted to send notification on missing or closed substream: {}, {:?}", + target, protocol, + ); + return + } + }; - // Make sure the local peer ID is never added to the PSM - // or added as a "known address", even if given. - if peer == self.local_peer_id { - Err("Local peer ID in peer set.".to_string()) - } else { - Ok((peer, addr)) - } - }) - .collect::, String>>() - } -} + if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() { + notifications_sizes_metric + .with_label_values(&["out", &protocol]) + .observe(message.len() as f64); + } -impl sp_consensus::SyncOracle for NetworkService { - fn is_major_syncing(&mut self) -> bool { - Self::is_major_syncing(self) + // Sending is communicated to the `NotificationsSink`. + trace!( + target: "sub-libp2p", + "External API => Notification({:?}, {:?}, {} bytes)", + target, protocol, message.len() + ); + trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target); + sink.send_sync_notification(message); } - fn is_offline(&mut self) -> bool { - self.num_connected.load(Ordering::Relaxed) == 0 + fn notification_sender( + &self, + target: PeerId, + protocol: Cow<'static, str>, + ) -> Result, NotificationSenderError> { + // We clone the `NotificationsSink` in order to be able to unlock the network-wide + // `peers_notifications_sinks` mutex as soon as possible. + let sink = { + let peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) { + sink.clone() + } else { + return Err(NotificationSenderError::Closed) + } + }; + + let notification_size_metric = self + .notifications_sizes_metric + .as_ref() + .map(|histogram| histogram.with_label_values(&["out", &protocol])); + + Ok(Box::new(NotificationSender { sink, protocol_name: protocol, notification_size_metric })) } } -impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for &'a NetworkService { - fn is_major_syncing(&mut self) -> bool { - NetworkService::is_major_syncing(self) +#[async_trait::async_trait] +impl NetworkRequest for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + async fn request( + &self, + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + connect: IfDisconnected, + ) -> Result, RequestFailure> { + let (tx, rx) = oneshot::channel(); + + self.start_request(target, protocol, request, tx, connect); + + match rx.await { + Ok(v) => v, + // The channel can only be closed if the network worker no longer exists. If the + // network worker no longer exists, then all connections to `target` are necessarily + // closed, and we legitimately report this situation as a "ConnectionClosed". + Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)), + } } - fn is_offline(&mut self) -> bool { - self.num_connected.load(Ordering::Relaxed) == 0 + fn start_request( + &self, + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + tx: oneshot::Sender, RequestFailure>>, + connect: IfDisconnected, + ) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { + target, + protocol: protocol.into(), + request, + pending_response: tx, + connect, + }); } } -impl sc_consensus::JustificationSyncLink for NetworkService { - fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - Self::request_justification(self, hash, number); +impl NetworkTransaction for NetworkService +where + B: BlockT + 'static, + H: ExHashT, +{ + fn trigger_repropagate(&self) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions); } - fn clear_justification_requests(&self) { - Self::clear_justification_requests(self); + fn propagate_transaction(&self, hash: H) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash)); } } -impl NetworkStateInfo for NetworkService +impl NetworkBlock> for NetworkService where - B: sp_runtime::traits::Block, + B: BlockT + 'static, H: ExHashT, { - /// Returns the local external addresses. - fn external_addresses(&self) -> Vec { - self.external_addresses.lock().clone() + fn announce_block(&self, hash: B::Hash, data: Option>) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); } - /// Returns the local Peer ID. - fn local_peer_id(&self) -> PeerId { - self.local_peer_id + fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor) { + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number)); } } @@ -1385,26 +1207,27 @@ pub struct NotificationSender { notification_size_metric: Option, } -impl NotificationSender { - /// Returns a future that resolves when the `NotificationSender` is ready to send a - /// notification. - pub async fn ready(&self) -> Result, NotificationSenderError> { - Ok(NotificationSenderReady { +#[async_trait::async_trait] +impl NotificationSenderT for NotificationSender { + async fn ready( + &self, + ) -> Result, NotificationSenderError> { + Ok(Box::new(NotificationSenderReady { ready: match self.sink.reserve_notification().await { - Ok(r) => r, + Ok(r) => Some(r), Err(()) => return Err(NotificationSenderError::Closed), }, peer_id: self.sink.peer_id(), protocol_name: &self.protocol_name, notification_size_metric: self.notification_size_metric.clone(), - }) + })) } } /// Reserved slot in the notifications buffer, ready to accept data. #[must_use] pub struct NotificationSenderReady<'a> { - ready: Ready<'a>, + ready: Option>, /// Target of the notification. peer_id: &'a PeerId, @@ -1417,11 +1240,8 @@ pub struct NotificationSenderReady<'a> { notification_size_metric: Option, } -impl<'a> NotificationSenderReady<'a> { - /// Consumes this slots reservation and actually queues the notification. - pub fn send(self, notification: impl Into>) -> Result<(), NotificationSenderError> { - let notification = notification.into(); - +impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> { + fn send(&mut self, notification: Vec) -> Result<(), NotificationSenderError> { if let Some(notification_size_metric) = &self.notification_size_metric { notification_size_metric.observe(notification.len() as f64); } @@ -1433,26 +1253,14 @@ impl<'a> NotificationSenderReady<'a> { ); trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id); - self.ready.send(notification).map_err(|()| NotificationSenderError::Closed) + self.ready + .take() + .ok_or(NotificationSenderError::Closed)? + .send(notification) + .map_err(|()| NotificationSenderError::Closed) } } -/// Error returned by [`NetworkService::send_notification`]. -#[derive(Debug, thiserror::Error)] -pub enum NotificationSenderError { - /// The notification receiver has been closed, usually because the underlying connection - /// closed. - /// - /// Some of the notifications most recently sent may not have been received. However, - /// the peer may still be connected and a new `NotificationSender` for the same - /// protocol obtained from [`NetworkService::notification_sender`]. - #[error("The notification receiver has been closed")] - Closed, - /// Protocol name hasn't been registered. - #[error("Protocol name hasn't been registered")] - BadProtocol, -} - /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index c95b46af4cefa..4144d7f19551e 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -31,11 +31,10 @@ //! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the //! collection. -use crate::Event; - use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream}; use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; +use sc_network_common::protocol::event::Event; use std::{ cell::RefCell, fmt, diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index f757bf4891fbc..6ccca17650b67 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -16,11 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config, Event, NetworkService, NetworkWorker}; +use crate::{config, NetworkService, NetworkWorker}; use futures::prelude::*; use libp2p::PeerId; -use sc_network_common::config::ProtocolId; +use sc_network_common::{ + config::ProtocolId, + protocol::event::Event, + service::{NetworkEventStream, NetworkNotification, NetworkPeers, NetworkStateInfo}, +}; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, @@ -192,7 +196,7 @@ fn build_nodes_one_proto() -> ( set_config: config::SetConfig { reserved_nodes: vec![config::MultiaddrWithPeerId { multiaddr: listen_addr, - peer_id: node1.local_peer_id().clone(), + peer_id: node1.local_peer_id(), }], ..Default::default() }, @@ -214,18 +218,10 @@ fn notifications_state_consistent() { // Write some initial notifications that shouldn't get through. for _ in 0..(rand::random::() % 5) { - node1.write_notification( - node2.local_peer_id().clone(), - PROTOCOL_NAME, - b"hello world".to_vec(), - ); + node1.write_notification(node2.local_peer_id(), PROTOCOL_NAME, b"hello world".to_vec()); } for _ in 0..(rand::random::() % 5) { - node2.write_notification( - node1.local_peer_id().clone(), - PROTOCOL_NAME, - b"hello world".to_vec(), - ); + node2.write_notification(node1.local_peer_id(), PROTOCOL_NAME, b"hello world".to_vec()); } async_std::task::block_on(async move { @@ -249,14 +245,14 @@ fn notifications_state_consistent() { // test consists in ensuring that notifications get ignored if the stream isn't open. if rand::random::() % 5 >= 3 { node1.write_notification( - node2.local_peer_id().clone(), + node2.local_peer_id(), PROTOCOL_NAME, b"hello world".to_vec(), ); } if rand::random::() % 5 >= 3 { node2.write_notification( - node1.local_peer_id().clone(), + node1.local_peer_id(), PROTOCOL_NAME, b"hello world".to_vec(), ); @@ -264,10 +260,10 @@ fn notifications_state_consistent() { // Also randomly disconnect the two nodes from time to time. if rand::random::() % 20 == 0 { - node1.disconnect_peer(node2.local_peer_id().clone(), PROTOCOL_NAME); + node1.disconnect_peer(node2.local_peer_id(), PROTOCOL_NAME); } if rand::random::() % 20 == 0 { - node2.disconnect_peer(node1.local_peer_id().clone(), PROTOCOL_NAME); + node2.disconnect_peer(node1.local_peer_id(), PROTOCOL_NAME); } // Grab next event from either `events_stream1` or `events_stream2`. @@ -295,7 +291,7 @@ fn notifications_state_consistent() { something_happened = true; assert!(!node1_to_node2_open); node1_to_node2_open = true; - assert_eq!(remote, *node2.local_peer_id()); + assert_eq!(remote, node2.local_peer_id()); }, future::Either::Right(Event::NotificationStreamOpened { remote, protocol, .. @@ -304,7 +300,7 @@ fn notifications_state_consistent() { something_happened = true; assert!(!node2_to_node1_open); node2_to_node1_open = true; - assert_eq!(remote, *node1.local_peer_id()); + assert_eq!(remote, node1.local_peer_id()); }, future::Either::Left(Event::NotificationStreamClosed { remote, protocol, .. @@ -312,7 +308,7 @@ fn notifications_state_consistent() { if protocol == PROTOCOL_NAME { assert!(node1_to_node2_open); node1_to_node2_open = false; - assert_eq!(remote, *node2.local_peer_id()); + assert_eq!(remote, node2.local_peer_id()); }, future::Either::Right(Event::NotificationStreamClosed { remote, protocol, .. @@ -320,14 +316,14 @@ fn notifications_state_consistent() { if protocol == PROTOCOL_NAME { assert!(node2_to_node1_open); node2_to_node1_open = false; - assert_eq!(remote, *node1.local_peer_id()); + assert_eq!(remote, node1.local_peer_id()); }, future::Either::Left(Event::NotificationsReceived { remote, .. }) => { assert!(node1_to_node2_open); - assert_eq!(remote, *node2.local_peer_id()); + assert_eq!(remote, node2.local_peer_id()); if rand::random::() % 5 >= 4 { node1.write_notification( - node2.local_peer_id().clone(), + node2.local_peer_id(), PROTOCOL_NAME, b"hello world".to_vec(), ); @@ -335,10 +331,10 @@ fn notifications_state_consistent() { }, future::Either::Right(Event::NotificationsReceived { remote, .. }) => { assert!(node2_to_node1_open); - assert_eq!(remote, *node1.local_peer_id()); + assert_eq!(remote, node1.local_peer_id()); if rand::random::() % 5 >= 4 { node2.write_notification( - node1.local_peer_id().clone(), + node1.local_peer_id(), PROTOCOL_NAME, b"hello world".to_vec(), ); @@ -373,7 +369,7 @@ fn lots_of_incoming_peers_works() { ..config::NetworkConfiguration::new_local() }); - let main_node_peer_id = *main_node.local_peer_id(); + let main_node_peer_id = main_node.local_peer_id(); // We spawn background tasks and push them in this `Vec`. They will all be waited upon before // this test ends. @@ -476,8 +472,13 @@ fn notifications_back_pressure() { // Sending! for num in 0..TOTAL_NOTIFS { - let notif = node1.notification_sender(node2_id.clone(), PROTOCOL_NAME).unwrap(); - notif.ready().await.unwrap().send(format!("hello #{}", num)).unwrap(); + let notif = node1.notification_sender(node2_id, PROTOCOL_NAME).unwrap(); + notif + .ready() + .await + .unwrap() + .send(format!("hello #{}", num).into_bytes()) + .unwrap(); } receiver.await; @@ -514,7 +515,7 @@ fn fallback_name_working() { set_config: config::SetConfig { reserved_nodes: vec![config::MultiaddrWithPeerId { multiaddr: listen_addr, - peer_id: node1.local_peer_id().clone(), + peer_id: node1.local_peer_id(), }], ..Default::default() }, diff --git a/client/network/src/transactions.rs b/client/network/src/transactions.rs index 342b6a0430272..f7e4d774ca812 100644 --- a/client/network/src/transactions.rs +++ b/client/network/src/transactions.rs @@ -32,7 +32,7 @@ use crate::{ protocol::message, service::NetworkService, utils::{interval, LruHashSet}, - Event, ExHashT, ObservedRole, + ExHashT, }; use codec::{Decode, Encode}; @@ -40,7 +40,11 @@ use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered}; use libp2p::{multiaddr, PeerId}; use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; -use sc_network_common::config::ProtocolId; +use sc_network_common::{ + config::ProtocolId, + protocol::event::{Event, ObservedRole}, + service::{NetworkEventStream, NetworkNotification, NetworkPeers}, +}; use sp_runtime::traits::Block as BlockT; use std::{ borrow::Cow, @@ -176,7 +180,7 @@ impl TransactionsHandlerPrototype { transaction_pool: Arc>, metrics_registry: Option<&Registry>, ) -> error::Result<(TransactionsHandler, TransactionsHandlerController)> { - let event_stream = service.event_stream("transactions-handler").boxed(); + let event_stream = service.event_stream("transactions-handler"); let (to_handler, from_controller) = mpsc::unbounded(); let gossip_enabled = Arc::new(AtomicBool::new(false)); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 09b4139c213a6..3ba663aba2267 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -45,7 +45,8 @@ use sc_client_api::{ }; use sc_consensus::{ BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, - ForkChoiceStrategy, ImportResult, JustificationImport, LongestChain, Verifier, + ForkChoiceStrategy, ImportResult, JustificationImport, JustificationSyncLink, LongestChain, + Verifier, }; pub use sc_network::config::EmptyTransactionPool; use sc_network::{ @@ -56,8 +57,9 @@ use sc_network::{ Multiaddr, NetworkService, NetworkWorker, }; pub use sc_network_common::config::ProtocolId; -use sc_network_common::sync::warp::{ - AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider, +use sc_network_common::{ + service::{NetworkBlock, NetworkStateInfo, NetworkSyncForkRequest}, + sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider}, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ @@ -71,7 +73,7 @@ use sp_blockchain::{ }; use sp_consensus::{ block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator}, - BlockOrigin, Error as ConsensusError, + BlockOrigin, Error as ConsensusError, SyncOracle, }; use sp_core::H256; use sp_runtime::{ @@ -243,7 +245,7 @@ where { /// Get this peer ID. pub fn id(&self) -> PeerId { - *self.network.service().local_peer_id() + self.network.service().local_peer_id() } /// Returns true if we're major syncing. @@ -797,7 +799,7 @@ where let addrs = connect_to .iter() .map(|v| { - let peer_id = *self.peer(*v).network_service().local_peer_id(); + let peer_id = self.peer(*v).network_service().local_peer_id(); let multiaddr = self.peer(*v).listen_addr.clone(); MultiaddrWithPeerId { peer_id, multiaddr } }) @@ -893,7 +895,7 @@ where self.mut_peers(move |peers| { for peer in peers.iter_mut() { peer.network - .add_known_address(*network.service().local_peer_id(), listen_addr.clone()); + .add_known_address(network.service().local_peer_id(), listen_addr.clone()); } let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse()); diff --git a/client/offchain/Cargo.toml b/client/offchain/Cargo.toml index 8da2d4be3adde..ab26c0c38596c 100644 --- a/client/offchain/Cargo.toml +++ b/client/offchain/Cargo.toml @@ -29,6 +29,7 @@ threadpool = "1.7" tracing = "0.1.29" sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-core = { version = "6.0.0", path = "../../primitives/core" } diff --git a/client/offchain/src/api.rs b/client/offchain/src/api.rs index c80b511c84d17..f379ebad17bbf 100644 --- a/client/offchain/src/api.rs +++ b/client/offchain/src/api.rs @@ -325,19 +325,88 @@ impl AsyncApi { mod tests { use super::*; use sc_client_db::offchain::LocalStorage; - use sc_network::{NetworkStateInfo, PeerId}; + use sc_network::{PeerId, ReputationChange}; + use sc_network_common::service::{NetworkPeers, NetworkStateInfo}; use sp_core::offchain::{DbExternalities, Externalities}; - use std::time::SystemTime; + use std::{borrow::Cow, time::SystemTime}; pub(super) struct TestNetwork(); - impl NetworkProvider for TestNetwork { + impl NetworkPeers for TestNetwork { fn set_authorized_peers(&self, _peers: HashSet) { - unimplemented!() + unimplemented!(); } fn set_authorized_only(&self, _reserved_only: bool) { - unimplemented!() + unimplemented!(); + } + + fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) { + unimplemented!(); + } + + fn report_peer(&self, _who: PeerId, _cost_benefit: ReputationChange) { + unimplemented!(); + } + + fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) { + unimplemented!(); + } + + fn accept_unreserved_peers(&self) { + unimplemented!(); + } + + fn deny_unreserved_peers(&self) { + unimplemented!(); + } + + fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { + unimplemented!(); + } + + fn remove_reserved_peer(&self, _peer_id: PeerId) { + unimplemented!(); + } + + fn set_reserved_peers( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn add_peers_to_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_peers_from_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: Vec, + ) { + unimplemented!(); + } + + fn add_to_peers_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec) { + unimplemented!(); + } + + fn sync_num_connected(&self) -> usize { + unimplemented!(); } } diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index d54d491b04c43..14a2408e61a70 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -35,14 +35,14 @@ #![warn(missing_docs)] -use std::{collections::HashSet, fmt, marker::PhantomData, sync::Arc}; +use std::{fmt, marker::PhantomData, sync::Arc}; use futures::{ future::{ready, Future}, prelude::*, }; use parking_lot::Mutex; -use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId}; +use sc_network_common::service::{NetworkPeers, NetworkStateInfo}; use sp_api::{ApiExt, ProvideRuntimeApi}; use sp_core::{offchain, traits::SpawnNamed, ExecutionContext}; use sp_runtime::{ @@ -60,27 +60,9 @@ const LOG_TARGET: &str = "offchain-worker"; /// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the /// underlying Substrate networking. -pub trait NetworkProvider: NetworkStateInfo { - /// Set the authorized peers. - fn set_authorized_peers(&self, peers: HashSet); +pub trait NetworkProvider: NetworkStateInfo + NetworkPeers {} - /// Set the authorized only flag. - fn set_authorized_only(&self, reserved_only: bool); -} - -impl NetworkProvider for NetworkService -where - B: traits::Block + 'static, - H: ExHashT, -{ - fn set_authorized_peers(&self, peers: HashSet) { - NetworkService::set_authorized_peers(self, peers) - } - - fn set_authorized_only(&self, reserved_only: bool) { - NetworkService::set_authorized_only(self, reserved_only) - } -} +impl NetworkProvider for T where T: NetworkStateInfo + NetworkPeers {} /// Options for [`OffchainWorkers`] pub struct OffchainWorkerOptions { @@ -266,11 +248,11 @@ mod tests { use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider as _; use sc_client_api::Backend as _; - use sc_network::{Multiaddr, PeerId}; + use sc_network::{Multiaddr, PeerId, ReputationChange}; use sc_transaction_pool::{BasicPool, FullChainApi}; use sc_transaction_pool_api::{InPoolTransaction, TransactionPool}; use sp_consensus::BlockOrigin; - use std::sync::Arc; + use std::{borrow::Cow, collections::HashSet, sync::Arc}; use substrate_test_runtime_client::{ runtime::Block, ClientBlockImportExt, DefaultTestClientBuilderExt, TestClient, TestClientBuilderExt, @@ -288,13 +270,81 @@ mod tests { } } - impl NetworkProvider for TestNetwork { + impl NetworkPeers for TestNetwork { fn set_authorized_peers(&self, _peers: HashSet) { - unimplemented!() + unimplemented!(); } fn set_authorized_only(&self, _reserved_only: bool) { - unimplemented!() + unimplemented!(); + } + + fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) { + unimplemented!(); + } + + fn report_peer(&self, _who: PeerId, _cost_benefit: ReputationChange) { + unimplemented!(); + } + + fn disconnect_peer(&self, _who: PeerId, _protocol: Cow<'static, str>) { + unimplemented!(); + } + + fn accept_unreserved_peers(&self) { + unimplemented!(); + } + + fn deny_unreserved_peers(&self) { + unimplemented!(); + } + + fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { + unimplemented!(); + } + + fn remove_reserved_peer(&self, _peer_id: PeerId) { + unimplemented!(); + } + + fn set_reserved_peers( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn add_peers_to_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_peers_from_reserved_set( + &self, + _protocol: Cow<'static, str>, + _peers: Vec, + ) { + unimplemented!(); + } + + fn add_to_peers_set( + &self, + _protocol: Cow<'static, str>, + _peers: HashSet, + ) -> Result<(), String> { + unimplemented!(); + } + + fn remove_from_peers_set(&self, _protocol: Cow<'static, str>, _peers: Vec) { + unimplemented!(); + } + + fn sync_num_connected(&self) -> usize { + unimplemented!(); } } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 55c0006de33fb..f81143583eacf 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -38,7 +38,10 @@ use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; use sc_network::{config::SyncMode, NetworkService}; -use sc_network_common::sync::warp::WarpSyncProvider; +use sc_network_common::{ + service::{NetworkStateInfo, NetworkStatusProvider, NetworkTransaction}, + sync::warp::WarpSyncProvider, +}; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, @@ -319,6 +322,31 @@ where ) } +/// Shared network instance implementing a set of mandatory traits. +pub trait SpawnTaskNetwork: + sc_offchain::NetworkProvider + + NetworkStateInfo + + NetworkTransaction + + NetworkStatusProvider + + Send + + Sync + + 'static +{ +} + +impl SpawnTaskNetwork for T +where + Block: BlockT, + T: sc_offchain::NetworkProvider + + NetworkStateInfo + + NetworkTransaction + + NetworkStatusProvider + + Send + + Sync + + 'static, +{ +} + /// Parameters to pass into `build`. pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { /// The service configuration. @@ -337,7 +365,7 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { pub rpc_builder: Box Result, Error>>, /// A shared network instance. - pub network: Arc::Hash>>, + pub network: Arc>, /// A Sender for RPC requests. pub system_rpc_tx: TracingUnboundedSender>, /// Telemetry instance for this node. @@ -349,7 +377,7 @@ pub fn build_offchain_workers( config: &Configuration, spawn_handle: SpawnTaskHandle, client: Arc, - network: Arc::Hash>>, + network: Arc, ) -> Option>> where TBl: BlockT, @@ -516,13 +544,14 @@ where Ok(rpc_handlers) } -async fn transaction_notifications( - transaction_pool: Arc, - network: Arc::Hash>>, +async fn transaction_notifications( + transaction_pool: Arc, + network: Network, telemetry: Option, ) where - TBl: BlockT, - TExPool: MaintainedTransactionPool::Hash>, + Block: BlockT, + ExPool: MaintainedTransactionPool::Hash>, + Network: NetworkTransaction<::Hash> + Send + Sync, { // transaction notifications transaction_pool @@ -542,13 +571,18 @@ async fn transaction_notifications( .await; } -fn init_telemetry>( +fn init_telemetry( config: &mut Configuration, - network: Arc::Hash>>, - client: Arc, + network: Network, + client: Arc, telemetry: &mut Telemetry, sysinfo: Option, -) -> sc_telemetry::Result { +) -> sc_telemetry::Result +where + Block: BlockT, + Client: BlockBackend, + Network: NetworkStateInfo, +{ let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default(); let connection_message = ConnectionMessage { name: config.network.node_name.to_owned(), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 5291d219e8102..2f35a04451e79 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -42,9 +42,11 @@ use jsonrpsee::{core::Error as JsonRpseeError, RpcModule}; use log::{debug, error, warn}; use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider}; use sc_network::PeerId; +use sc_network_common::service::NetworkBlock; use sc_rpc_server::WsConfig; use sc_utils::mpsc::TracingUnboundedReceiver; use sp_blockchain::HeaderMetadata; +use sp_consensus::SyncOracle; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as HeaderT}, diff --git a/client/service/src/metrics.rs b/client/service/src/metrics.rs index ef3132f61ab99..13b249a7b9563 100644 --- a/client/service/src/metrics.rs +++ b/client/service/src/metrics.rs @@ -22,7 +22,8 @@ use crate::config::Configuration; use futures_timer::Delay; use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_client_api::{ClientInfo, UsageProvider}; -use sc_network::{config::Role, NetworkService, NetworkStatus}; +use sc_network::config::Role; +use sc_network_common::service::{NetworkStatus, NetworkStatusProvider}; use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO}; use sc_transaction_pool_api::{MaintainedTransactionPool, PoolStatus}; use sc_utils::metrics::register_globals; @@ -182,15 +183,16 @@ impl MetricsService { /// Returns a never-ending `Future` that performs the /// metric and telemetry updates with information from /// the given sources. - pub async fn run( + pub async fn run( mut self, client: Arc, transactions: Arc, - network: Arc::Hash>>, + network: TNet, ) where TBl: Block, TCl: ProvideRuntimeApi + UsageProvider, TExPool: MaintainedTransactionPool::Hash>, + TNet: NetworkStatusProvider, { let mut timer = Delay::new(Duration::from_secs(0)); let timer_interval = Duration::from_secs(5); diff --git a/client/service/test/Cargo.toml b/client/service/test/Cargo.toml index d003db57eb7ac..01c3ee2348ef5 100644 --- a/client/service/test/Cargo.toml +++ b/client/service/test/Cargo.toml @@ -27,6 +27,7 @@ sc-client-db = { version = "0.10.0-dev", default-features = false, path = "../.. sc-consensus = { version = "0.10.0-dev", path = "../../../client/consensus/common" } sc-executor = { version = "0.10.0-dev", path = "../../executor" } sc-network = { version = "0.10.0-dev", path = "../../network" } +sc-network-common = { version = "0.10.0-dev", path = "../../network/common" } sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../../service" } sc-transaction-pool-api = { version = "4.0.0-dev", path = "../../../client/transaction-pool/api" } sp-api = { version = "4.0.0-dev", path = "../../../primitives/api" } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 2d63362daffba..9c720c6fedea0 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -26,6 +26,7 @@ use sc_network::{ config::{NetworkConfiguration, TransportConfig}, multiaddr, Multiaddr, }; +use sc_network_common::service::{NetworkBlock, NetworkPeers, NetworkStateInfo}; use sc_service::{ client::Client, config::{BasePath, DatabaseSource, KeystoreConfig}, @@ -320,7 +321,7 @@ where handle.spawn(service.clone().map_err(|_| ())); let addr = - addr.with(multiaddr::Protocol::P2p((*service.network().local_peer_id()).into())); + addr.with(multiaddr::Protocol::P2p((service.network().local_peer_id()).into())); self.authority_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; } @@ -340,7 +341,7 @@ where handle.spawn(service.clone().map_err(|_| ())); let addr = - addr.with(multiaddr::Protocol::P2p((*service.network().local_peer_id()).into())); + addr.with(multiaddr::Protocol::P2p((service.network().local_peer_id()).into())); self.full_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; } @@ -387,7 +388,7 @@ where } network.run_until_all_full(move |_index, service| { - let connected = service.network().num_connected(); + let connected = service.network().sync_num_connected(); debug!("Got {}/{} full connections...", connected, expected_full_connections); connected == expected_full_connections }); @@ -422,7 +423,7 @@ where } network.run_until_all_full(move |_index, service| { - let connected = service.network().num_connected(); + let connected = service.network().sync_num_connected(); debug!("Got {}/{} full connections...", connected, expected_full_connections); connected == expected_full_connections }); diff --git a/primitives/consensus/common/src/lib.rs b/primitives/consensus/common/src/lib.rs index 4539cec2c8e0a..043533cbf2258 100644 --- a/primitives/consensus/common/src/lib.rs +++ b/primitives/consensus/common/src/lib.rs @@ -236,10 +236,10 @@ pub trait Proposer { pub trait SyncOracle { /// Whether the synchronization service is undergoing major sync. /// Returns true if so. - fn is_major_syncing(&mut self) -> bool; + fn is_major_syncing(&self) -> bool; /// Whether the synchronization service is offline. /// Returns true if so. - fn is_offline(&mut self) -> bool; + fn is_offline(&self) -> bool; } /// A synchronization oracle for when there is no network. @@ -247,10 +247,10 @@ pub trait SyncOracle { pub struct NoNetwork; impl SyncOracle for NoNetwork { - fn is_major_syncing(&mut self) -> bool { + fn is_major_syncing(&self) -> bool { false } - fn is_offline(&mut self) -> bool { + fn is_offline(&self) -> bool { false } } @@ -258,14 +258,14 @@ impl SyncOracle for NoNetwork { impl SyncOracle for Arc where T: ?Sized, - for<'r> &'r T: SyncOracle, + T: SyncOracle, { - fn is_major_syncing(&mut self) -> bool { - <&T>::is_major_syncing(&mut &**self) + fn is_major_syncing(&self) -> bool { + T::is_major_syncing(self) } - fn is_offline(&mut self) -> bool { - <&T>::is_offline(&mut &**self) + fn is_offline(&self) -> bool { + T::is_offline(self) } }