diff --git a/prdoc/pr_4015.prdoc b/prdoc/pr_4015.prdoc new file mode 100644 index 000000000000..ede1731c4ab8 --- /dev/null +++ b/prdoc/pr_4015.prdoc @@ -0,0 +1,14 @@ +title: Improve beefy networking code by forwarding data more directly + +doc: + - audience: Node Operator + description: | + Improve internal implementation of beefy to forward data directly to the + networking layer instead of first storing them internally. So, the + following error message should not appear again: + ``` + The number of unprocessed messages in channel `mpsc_beefy_gossip_validator` exceeded 100000. + ``` + +crates: + - name: sc-consensus-beefy diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index eb43c9173d75..d31559131cc1 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -18,21 +18,17 @@ use std::{collections::BTreeSet, sync::Arc, time::Duration}; -use sc_network::{PeerId, ReputationChange}; +use sc_network::{NetworkPeers, PeerId, ReputationChange}; use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext}; use sp_runtime::traits::{Block, Hash, Header, NumberFor}; use codec::{Decode, DecodeAll, Encode}; use log::{debug, trace}; use parking_lot::{Mutex, RwLock}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use wasm_timer::Instant; use crate::{ - communication::{ - benefit, cost, - peers::{KnownPeers, PeerReport}, - }, + communication::{benefit, cost, peers::KnownPeers}, justification::{ proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof, }, @@ -223,7 +219,7 @@ impl Filter { /// rejected/expired. /// ///All messaging is handled in a single BEEFY global topic. -pub(crate) struct GossipValidator +pub(crate) struct GossipValidator where B: Block, { @@ -232,26 +228,22 @@ where gossip_filter: RwLock>, next_rebroadcast: Mutex, known_peers: Arc>>, - report_sender: TracingUnboundedSender, + network: Arc, } -impl GossipValidator +impl GossipValidator where B: Block, { - pub(crate) fn new( - known_peers: Arc>>, - ) -> (GossipValidator, TracingUnboundedReceiver) { - let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000); - let val = GossipValidator { + pub(crate) fn new(known_peers: Arc>>, network: Arc) -> Self { + Self { votes_topic: votes_topic::(), justifs_topic: proofs_topic::(), gossip_filter: RwLock::new(Filter::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), known_peers, - report_sender: tx, - }; - (val, rx) + network, + } } /// Update gossip validator filter. @@ -265,9 +257,15 @@ where ); self.gossip_filter.write().update(filter); } +} +impl GossipValidator +where + B: Block, + N: NetworkPeers, +{ fn report(&self, who: PeerId, cost_benefit: ReputationChange) { - let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); + self.network.report_peer(who, cost_benefit); } fn validate_vote( @@ -370,9 +368,10 @@ where } } -impl Validator for GossipValidator +impl Validator for GossipValidator where B: Block, + N: NetworkPeers + Send + Sync, { fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, who: &PeerId) { self.known_peers.lock().remove(who); @@ -486,7 +485,7 @@ where #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::keystore::BeefyKeystore; + use crate::{communication::peers::PeerReport, keystore::BeefyKeystore}; use sc_network_test::Block; use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE; use sp_consensus_beefy::{ @@ -495,20 +494,109 @@ pub(crate) mod tests { }; use sp_keystore::{testing::MemoryKeystore, Keystore}; + pub(crate) struct TestNetwork { + report_sender: futures::channel::mpsc::UnboundedSender, + } + + impl TestNetwork { + pub fn new() -> (Self, futures::channel::mpsc::UnboundedReceiver) { + let (tx, rx) = futures::channel::mpsc::unbounded(); + + (Self { report_sender: tx }, rx) + } + } + + impl NetworkPeers for TestNetwork { + fn set_authorized_peers(&self, _: std::collections::HashSet) { + unimplemented!() + } + + fn set_authorized_only(&self, _: bool) { + unimplemented!() + } + + fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) { + unimplemented!() + } + + fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) { + let _ = self.report_sender.unbounded_send(PeerReport { who: peer_id, cost_benefit }); + } + + fn peer_reputation(&self, _: &PeerId) -> i32 { + unimplemented!() + } + + fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) { + unimplemented!() + } + + fn accept_unreserved_peers(&self) { + unimplemented!() + } + + fn deny_unreserved_peers(&self) { + unimplemented!() + } + + fn add_reserved_peer( + &self, + _: sc_network::config::MultiaddrWithPeerId, + ) -> Result<(), String> { + unimplemented!() + } + + fn remove_reserved_peer(&self, _: PeerId) { + unimplemented!() + } + + fn set_reserved_peers( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + unimplemented!() + } + + fn add_peers_to_reserved_set( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + unimplemented!() + } + + fn remove_peers_from_reserved_set( + &self, + _: sc_network::ProtocolName, + _: Vec, + ) -> Result<(), String> { + unimplemented!() + } + + fn sync_num_connected(&self) -> usize { + unimplemented!() + } + + fn peer_role(&self, _: PeerId, _: Vec) -> Option { + unimplemented!() + } + } + struct TestContext; impl ValidatorContext for TestContext { fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) { - todo!() + unimplemented!() } fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec, _force: bool) {} fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec) { - todo!() + unimplemented!() } fn send_topic(&mut self, _who: &sc_network::PeerId, _topic: B::Hash, _force: bool) { - todo!() + unimplemented!() } } @@ -560,8 +648,13 @@ pub(crate) mod tests { fn should_validate_messages() { let keys = vec![Keyring::::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let (gv, mut report_stream) = - GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + + let (network, mut report_stream) = TestNetwork::new(); + + let gv = GossipValidator::::new( + Arc::new(Mutex::new(KnownPeers::new())), + Arc::new(network), + ); let sender = PeerId::random(); let mut context = TestContext; @@ -574,7 +667,7 @@ pub(crate) mod tests { let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost }; let res = gv.validate(&mut context, &sender, bad_encoding); assert!(matches!(res, ValidationResult::Discard)); - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // verify votes validation @@ -585,14 +678,14 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); // nothing reported - assert!(report_stream.try_recv().is_err()); + assert!(report_stream.try_next().is_err()); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); // nothing in cache first time let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VOTE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject vote, voter not in validator set let mut bad_vote = vote.clone(); @@ -601,7 +694,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &bad_vote); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::UNKNOWN_VOTER; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject if the round is not GRANDPA finalized gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set }); @@ -611,7 +704,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::FUTURE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject if the round is not live anymore gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set }); @@ -621,7 +714,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::OUTDATED_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // now verify proofs validation @@ -631,7 +724,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::OUTDATED_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // accept next proof with good set_id let proof = dummy_proof(7, &validator_set); @@ -639,7 +732,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VALIDATED_PROOF; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // accept future proof with good set_id let proof = dummy_proof(20, &validator_set); @@ -647,7 +740,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VALIDATED_PROOF; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject proof, future set_id let bad_validator_set = ValidatorSet::::new(keys, 1).unwrap(); @@ -656,7 +749,7 @@ pub(crate) mod tests { let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::FUTURE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); // reject proof, bad signatures (Bob instead of Alice) let bad_validator_set = @@ -667,14 +760,17 @@ pub(crate) mod tests { assert!(matches!(res, ValidationResult::Discard)); expected_report.cost_benefit = cost::INVALID_PROOF; expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); + assert_eq!(report_stream.try_next().unwrap().unwrap(), expected_report); } #[test] fn messages_allowed_and_expired() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let (gv, _) = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let gv = GossipValidator::::new( + Arc::new(Mutex::new(KnownPeers::new())), + Arc::new(TestNetwork::new().0), + ); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); @@ -751,7 +847,10 @@ pub(crate) mod tests { fn messages_rebroadcast() { let keys = vec![Keyring::Alice.public()]; let validator_set = ValidatorSet::::new(keys.clone(), 0).unwrap(); - let (gv, _) = GossipValidator::::new(Arc::new(Mutex::new(KnownPeers::new()))); + let gv = GossipValidator::::new( + Arc::new(Mutex::new(KnownPeers::new())), + Arc::new(TestNetwork::new().0), + ); gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set }); let sender = sc_network::PeerId::random(); let topic = Default::default(); diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 714a0fb7c885..2637481fbf3e 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -68,7 +68,7 @@ pub mod import; pub mod justification; use crate::{ - communication::{gossip::GossipValidator, peers::PeerReport}, + communication::gossip::GossipValidator, justification::BeefyVersionedFinalityProof, keystore::BeefyKeystore, metrics::VoterMetrics, @@ -78,7 +78,6 @@ use crate::{ pub use communication::beefy_protocol_name::{ gossip_protocol_name, justifications_protocol_name as justifs_protocol_name, }; -use sc_utils::mpsc::TracingUnboundedReceiver; use sp_runtime::generic::OpaqueDigestItemId; #[cfg(test)] @@ -228,10 +227,9 @@ pub struct BeefyParams { /// Helper object holding BEEFY worker communication/gossip components. /// /// These are created once, but will be reused if worker is restarted/reinitialized. -pub(crate) struct BeefyComms { +pub(crate) struct BeefyComms { pub gossip_engine: GossipEngine, - pub gossip_validator: Arc>, - pub gossip_report_stream: TracingUnboundedReceiver, + pub gossip_validator: Arc>, pub on_demand_justifications: OnDemandJustificationsEngine, } @@ -264,13 +262,13 @@ where /// persisted state in AUX DB and latest chain information/progress. /// /// Returns a sane `BeefyWorkerBuilder` that can build the `BeefyWorker`. - pub async fn async_initialize( + pub async fn async_initialize( backend: Arc, runtime: Arc, key_store: BeefyKeystore, metrics: Option, min_block_delta: u32, - gossip_validator: Arc>, + gossip_validator: Arc>, finality_notifications: &mut Fuse>, is_authority: bool, ) -> Result { @@ -298,15 +296,15 @@ where } /// Takes rest of missing pieces as params and builds the `BeefyWorker`. - pub fn build( + pub fn build( self, payload_provider: P, sync: Arc, - comms: BeefyComms, + comms: BeefyComms, links: BeefyVoterLinks, pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, is_authority: bool, - ) -> BeefyWorker { + ) -> BeefyWorker { BeefyWorker { backend: self.backend, runtime: self.runtime, @@ -526,8 +524,8 @@ pub async fn start_beefy_gadget( let known_peers = Arc::new(Mutex::new(KnownPeers::new())); // Default votes filter is to discard everything. // Validator is updated later with correct starting round and set id. - let (gossip_validator, gossip_report_stream) = - communication::gossip::GossipValidator::new(known_peers.clone()); + let gossip_validator = + communication::gossip::GossipValidator::new(known_peers.clone(), network.clone()); let gossip_validator = Arc::new(gossip_validator); let gossip_engine = GossipEngine::new( network.clone(), @@ -546,46 +544,35 @@ pub async fn start_beefy_gadget( known_peers, prometheus_registry.clone(), ); - let mut beefy_comms = BeefyComms { - gossip_engine, - gossip_validator, - gossip_report_stream, - on_demand_justifications, - }; + let mut beefy_comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications }; // We re-create and re-run the worker in this loop in order to quickly reinit and resume after // select recoverable errors. loop { // Make sure to pump gossip engine while waiting for initialization conditions. - let worker_builder = loop { - futures::select! { - builder_init_result = BeefyWorkerBuilder::async_initialize( - backend.clone(), - runtime.clone(), - key_store.clone().into(), - metrics.clone(), - min_block_delta, - beefy_comms.gossip_validator.clone(), - &mut finality_notifications, - is_authority, - ).fuse() => { - match builder_init_result { - Ok(builder) => break builder, - Err(e) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e); - return - }, - } - }, - // Pump peer reports - _ = &mut beefy_comms.gossip_report_stream.next() => { - continue - }, - // Pump gossip engine. - _ = &mut beefy_comms.gossip_engine => { - error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); - return + let worker_builder = futures::select! { + builder_init_result = BeefyWorkerBuilder::async_initialize( + backend.clone(), + runtime.clone(), + key_store.clone().into(), + metrics.clone(), + min_block_delta, + beefy_comms.gossip_validator.clone(), + &mut finality_notifications, + is_authority, + ).fuse() => { + match builder_init_result { + Ok(builder) => builder, + Err(e) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e); + return + }, } + }, + // Pump gossip engine. + _ = &mut beefy_comms.gossip_engine => { + error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); + return } }; diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index aecfec7b9ed1..d4ec6ffd497b 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -23,8 +23,9 @@ use crate::{ beefy_block_import_and_links, communication::{ gossip::{ - proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage, - GossipValidator, + proofs_topic, + tests::{sign_commitment, TestNetwork}, + votes_topic, GossipFilterCfg, GossipMessage, GossipValidator, }, request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, @@ -1450,7 +1451,7 @@ async fn gossipped_finality_proofs() { let charlie = &mut net.peers[2]; let known_peers = Arc::new(Mutex::new(KnownPeers::::new())); // Charlie will run just the gossip engine and not the full voter. - let (gossip_validator, _) = GossipValidator::new(known_peers); + let gossip_validator = GossipValidator::new(known_peers, Arc::new(TestNetwork::new().0)); let charlie_gossip_validator = Arc::new(gossip_validator); charlie_gossip_validator.update_filter(GossipFilterCfg:: { start: 1, diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index ac6b72d1ea40..05575ae01c30 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -19,7 +19,6 @@ use crate::{ communication::{ gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage}, - peers::PeerReport, request_response::outgoing_requests_engine::ResponseInfo, }, error::Error, @@ -374,7 +373,7 @@ impl PersistedState { } /// A BEEFY worker/voter that follows the BEEFY protocol -pub(crate) struct BeefyWorker { +pub(crate) struct BeefyWorker { // utilities pub backend: Arc, pub runtime: Arc, @@ -383,7 +382,7 @@ pub(crate) struct BeefyWorker { pub sync: Arc, // communication (created once, but returned and reused if worker is restarted/reinitialized) - pub comms: BeefyComms, + pub comms: BeefyComms, // channels /// Links between the block importer, the background voter and the RPC layer. @@ -400,7 +399,7 @@ pub(crate) struct BeefyWorker { pub is_authority: bool, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, @@ -827,7 +826,7 @@ where mut self, block_import_justif: &mut Fuse>>, finality_notifications: &mut Fuse>, - ) -> (Error, BeefyComms) { + ) -> (Error, BeefyComms) { info!( target: LOG_TARGET, "🥩 run BEEFY worker, best grandpa: #{:?}.", @@ -896,11 +895,8 @@ where }, ResponseInfo::PeerReport(peer_report) => { self.comms.gossip_engine.report(peer_report.who, peer_report.cost_benefit); - continue; - }, - ResponseInfo::Pending => { - continue; }, + ResponseInfo::Pending => {}, } }, justif = block_import_justif.next() => { @@ -935,13 +931,6 @@ where break Error::VotesGossipStreamTerminated; } }, - // Process peer reports. - report = self.comms.gossip_report_stream.next() => { - if let Some(PeerReport { who, cost_benefit }) = report { - self.comms.gossip_engine.report(who, cost_benefit); - } - continue; - }, } // Act on changed 'state'. @@ -1054,7 +1043,7 @@ pub(crate) mod tests { use super::*; use crate::{ communication::{ - gossip::GossipValidator, + gossip::{tests::TestNetwork, GossipValidator}, notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream}, request_response::outgoing_requests_engine::OnDemandJustificationsEngine, }, @@ -1111,6 +1100,7 @@ pub(crate) mod tests { MmrRootProvider, TestApi, Arc>, + TestNetwork, > { let keystore = create_beefy_keystore(key); @@ -1140,7 +1130,8 @@ pub(crate) mod tests { .take_notification_service(&crate::tests::beefy_gossip_proto_name()) .unwrap(); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone()); + let gossip_validator = + GossipValidator::new(known_peers.clone(), Arc::new(TestNetwork::new().0)); let gossip_validator = Arc::new(gossip_validator); let gossip_engine = GossipEngine::new( network.clone(), @@ -1173,12 +1164,7 @@ pub(crate) mod tests { ) .unwrap(); let payload_provider = MmrRootProvider::new(api.clone()); - let comms = BeefyComms { - gossip_engine, - gossip_validator, - gossip_report_stream, - on_demand_justifications, - }; + let comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications }; BeefyWorker { backend, runtime: api,