diff --git a/client/beefy/src/communication/gossip.rs b/client/beefy/src/communication/gossip.rs index 520548b943f96..bbc35ac8e526e 100644 --- a/client/beefy/src/communication/gossip.rs +++ b/client/beefy/src/communication/gossip.rs @@ -139,6 +139,10 @@ impl Validator for GossipValidator where B: Block, { + fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, who: &PeerId) { + self.known_peers.lock().remove(who); + } + fn validate( &self, _context: &mut dyn ValidatorContext, diff --git a/client/beefy/src/communication/peers.rs b/client/beefy/src/communication/peers.rs index 0e20a0f4e0ff6..d7927ea72e661 100644 --- a/client/beefy/src/communication/peers.rs +++ b/client/beefy/src/communication/peers.rs @@ -46,11 +46,6 @@ impl KnownPeers { Self { live: HashMap::new() } } - /// Add new connected `peer`. - pub fn add_new(&mut self, peer: PeerId) { - self.live.entry(peer).or_default(); - } - /// Note vote round number for `peer`. pub fn note_vote_for(&mut self, peer: PeerId, round: NumberFor) { let data = self.live.entry(peer).or_default(); @@ -87,16 +82,13 @@ mod tests { let mut peers = KnownPeers::::new(); assert!(peers.live.is_empty()); - // Alice and Bob new connected peers. - peers.add_new(alice); - peers.add_new(bob); // 'Tracked' Bob seen voting for 5. peers.note_vote_for(bob, 5); // Previously unseen Charlie now seen voting for 10. peers.note_vote_for(charlie, 10); - assert_eq!(peers.live.len(), 3); - assert!(peers.contains(&alice)); + assert_eq!(peers.live.len(), 2); + assert!(!peers.contains(&alice)); assert!(peers.contains(&bob)); assert!(peers.contains(&charlie)); diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 3bdd13982aea2..9dccd4236bef3 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -241,11 +241,12 @@ where None, ); + // The `GossipValidator` adds and removes known peers based on valid votes and network events. let on_demand_justifications = OnDemandJustificationsEngine::new( network.clone(), runtime.clone(), justifications_protocol_name, - known_peers.clone(), + known_peers, ); let metrics = @@ -286,7 +287,6 @@ where payload_provider, network, key_store: key_store.into(), - known_peers, gossip_engine, gossip_validator, on_demand_justifications, diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs index 9a31d4a583d0e..6b9cf824d906d 100644 --- a/client/beefy/src/tests.rs +++ b/client/beefy/src/tests.rs @@ -314,6 +314,27 @@ pub(crate) fn create_beefy_keystore(authority: BeefyKeyring) -> SyncCryptoStoreP keystore } +fn voter_init_setup( + net: &mut BeefyTestNet, + finality: &mut futures::stream::Fuse>, +) -> sp_blockchain::Result> { + let backend = net.peer(0).client().as_backend(); + let api = Arc::new(crate::tests::two_validators::TestApi {}); + let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + let gossip_validator = + Arc::new(crate::communication::gossip::GossipValidator::new(known_peers)); + let mut gossip_engine = sc_network_gossip::GossipEngine::new( + net.peer(0).network_service().clone(), + "/beefy/whatever", + gossip_validator, + None, + ); + let best_grandpa = + futures::executor::block_on(wait_for_runtime_pallet(&*api, &mut gossip_engine, finality)) + .unwrap(); + load_or_init_voter_state(&*backend, &*api, best_grandpa, 1) +} + // Spawns beefy voters. Returns a future to spawn on the runtime. fn initialize_beefy( net: &mut BeefyTestNet, @@ -943,27 +964,6 @@ fn on_demand_beefy_justification_sync() { finalize_block_and_wait_for_beefy(&net, all_peers, &mut runtime, &[30], &[30]); } -fn test_voter_init_setup( - net: &mut BeefyTestNet, - finality: &mut futures::stream::Fuse>, -) -> sp_blockchain::Result> { - let backend = net.peer(0).client().as_backend(); - let api = Arc::new(crate::tests::two_validators::TestApi {}); - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - let gossip_validator = - Arc::new(crate::communication::gossip::GossipValidator::new(known_peers)); - let mut gossip_engine = sc_network_gossip::GossipEngine::new( - net.peer(0).network_service().clone(), - "/beefy/whatever", - gossip_validator, - None, - ); - let best_grandpa = - futures::executor::block_on(wait_for_runtime_pallet(&*api, &mut gossip_engine, finality)) - .unwrap(); - load_or_init_voter_state(&*backend, &*api, best_grandpa, 1) -} - #[test] fn should_initialize_voter_at_genesis() { let keys = &[BeefyKeyring::Alice]; @@ -981,7 +981,7 @@ fn should_initialize_voter_at_genesis() { net.peer(0).client().as_client().finalize_block(hashof13, None).unwrap(); // load persistent state - nothing in DB, should init at session boundary - let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1044,7 +1044,7 @@ fn should_initialize_voter_when_last_final_is_session_boundary() { // expect rounds initialized at last beefy finalized 10. // load persistent state - nothing in DB, should init at session boundary - let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap(); // verify voter initialized with single session starting at block 10 assert_eq!(persisted_state.voting_oracle().sessions().len(), 1); @@ -1103,7 +1103,7 @@ fn should_initialize_voter_at_latest_finalized() { // Test initialization at last BEEFY finalized. // load persistent state - nothing in DB, should init at last BEEFY finalized - let persisted_state = test_voter_init_setup(&mut net, &mut finality).unwrap(); + let persisted_state = voter_init_setup(&mut net, &mut finality).unwrap(); // verify voter initialized with single session starting at block 12 assert_eq!(persisted_state.voting_oracle().sessions().len(), 1); diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index e387fed79c6a0..6726fa4375387 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -16,25 +16,31 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{ - collections::{BTreeMap, BTreeSet, VecDeque}, - fmt::Debug, - marker::PhantomData, - sync::Arc, +use crate::{ + communication::{ + gossip::{topic, GossipValidator}, + request_response::outgoing_requests_engine::OnDemandJustificationsEngine, + }, + error::Error, + justification::BeefyVersionedFinalityProof, + keystore::BeefyKeystore, + metric_inc, metric_set, + metrics::Metrics, + round::Rounds, + BeefyVoterLinks, +}; +use beefy_primitives::{ + crypto::{AuthorityId, Signature}, + BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment, + ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; - use codec::{Codec, Decode, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; -use parking_lot::Mutex; - use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; -use sc_network_common::{ - protocol::event::Event as NetEvent, - service::{NetworkEventStream, NetworkRequest}, -}; +use sc_network_common::service::{NetworkEventStream, NetworkRequest}; use sc_network_gossip::GossipEngine; - +use sc_utils::notification::NotificationReceiver; use sp_api::{BlockId, ProvideRuntimeApi}; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_consensus::SyncOracle; @@ -44,26 +50,11 @@ use sp_runtime::{ traits::{Block, Header, NumberFor, Zero}, SaturatedConversion, }; - -use beefy_primitives::{ - crypto::{AuthorityId, Signature}, - BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment, - ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, -}; -use sc_utils::notification::NotificationReceiver; - -use crate::{ - communication::{ - gossip::{topic, GossipValidator}, - request_response::outgoing_requests_engine::OnDemandJustificationsEngine, - }, - error::Error, - justification::BeefyVersionedFinalityProof, - keystore::BeefyKeystore, - metric_inc, metric_set, - metrics::Metrics, - round::Rounds, - BeefyVoterLinks, KnownPeers, +use std::{ + collections::{BTreeMap, BTreeSet, VecDeque}, + fmt::Debug, + marker::PhantomData, + sync::Arc, }; pub(crate) enum RoundAction { @@ -253,7 +244,6 @@ pub(crate) struct WorkerParams { pub payload_provider: P, pub network: N, pub key_store: BeefyKeystore, - pub known_peers: Arc>>, pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, pub on_demand_justifications: OnDemandJustificationsEngine, @@ -305,7 +295,6 @@ pub(crate) struct BeefyWorker { key_store: BeefyKeystore, // communication - known_peers: Arc>>, gossip_engine: GossipEngine, gossip_validator: Arc>, on_demand_justifications: OnDemandJustificationsEngine, @@ -349,7 +338,6 @@ where gossip_engine, gossip_validator, on_demand_justifications, - known_peers, links, metrics, persisted_state, @@ -359,7 +347,6 @@ where backend, payload_provider, network, - known_peers, key_store, gossip_engine, gossip_validator, @@ -783,6 +770,29 @@ where Ok(()) } + fn process_new_state(&mut self) { + // Handle pending justifications and/or votes for now GRANDPA finalized blocks. + if let Err(err) = self.try_pending_justif_and_votes() { + debug!(target: "beefy", "🥩 {}", err); + } + + // Don't bother voting or requesting justifications during major sync. + if !self.network.is_major_syncing() { + // There were external events, 'state' is changed, author a vote if needed/possible. + if let Err(err) = self.try_to_vote() { + debug!(target: "beefy", "🥩 {}", err); + } + // If the current target is a mandatory block, + // make sure there's also an on-demand justification request out for it. + if let Some(block) = self.voting_oracle().mandatory_pending() { + // This only starts new request if there isn't already an active one. + self.on_demand_justifications.request(block); + } + } else { + debug!(target: "beefy", "🥩 Skipping voting while major syncing."); + } + } + /// Main loop for BEEFY worker. /// /// Wait for BEEFY runtime pallet to be available, then start the main async loop @@ -794,7 +804,6 @@ where ) { info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block()); - let mut network_events = self.network.event_stream("network-gossip").fuse(); let mut votes = Box::pin( self.gossip_engine .messages_for(topic::()) @@ -810,25 +819,12 @@ where ); loop { - // Don't bother voting or requesting justifications during major sync. - if !self.network.is_major_syncing() { - // If the current target is a mandatory block, - // make sure there's also an on-demand justification request out for it. - if let Some(block) = self.voting_oracle().mandatory_pending() { - // This only starts new request if there isn't already an active one. - self.on_demand_justifications.request(block); - } - // There were external events, 'state' is changed, author a vote if needed/possible. - if let Err(err) = self.try_to_vote() { - debug!(target: "beefy", "🥩 {}", err); - } - } else { - debug!(target: "beefy", "🥩 Skipping voting while major syncing."); - } + // Act on changed 'state'. + self.process_new_state(); let mut gossip_engine = &mut self.gossip_engine; // Wait for, and handle external events. - // The branches below only change 'state', actual voting happen afterwards, + // The branches below only change 'state', actual voting happens afterwards, // based on the new resulting 'state'. futures::select_biased! { // Use `select_biased!` to prioritize order below. @@ -837,15 +833,6 @@ where error!(target: "beefy", "🥩 Gossip engine has terminated, closing worker."); return; }, - // Keep track of connected peers. - net_event = network_events.next() => { - if let Some(net_event) = net_event { - self.handle_network_event(net_event); - } else { - error!(target: "beefy", "🥩 Network events stream terminated, closing worker."); - return; - } - }, // Process finality notifications first since these drive the voter. notification = finality_notifications.next() => { if let Some(notification) = notification { @@ -888,25 +875,6 @@ where } }, } - - // Handle pending justifications and/or votes for now GRANDPA finalized blocks. - if let Err(err) = self.try_pending_justif_and_votes() { - debug!(target: "beefy", "🥩 {}", err); - } - } - } - - /// Update known peers based on network events. - fn handle_network_event(&mut self, event: NetEvent) { - match event { - NetEvent::SyncConnected { remote } => { - self.known_peers.lock().add_new(remote); - }, - NetEvent::SyncDisconnected { remote } => { - self.known_peers.lock().remove(&remote); - }, - // We don't care about other events. - _ => (), } } } @@ -976,11 +944,11 @@ pub(crate) mod tests { create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi, BeefyPeer, BeefyTestNet, }, - BeefyRPCLinks, + BeefyRPCLinks, KnownPeers, }; - use beefy_primitives::{known_payloads, mmr::MmrRootProvider}; use futures::{executor::block_on, future::poll_fn, task::Poll}; + use parking_lot::Mutex; use sc_client_api::{Backend as BackendT, HeaderBackend}; use sc_network::NetworkService; use sc_network_test::TestNetFactory; @@ -1058,7 +1026,7 @@ pub(crate) mod tests { network.clone(), api.clone(), "/beefy/justifs/1".into(), - known_peers.clone(), + known_peers, ); let at = BlockId::number(Zero::zero()); let genesis_header = backend.blockchain().expect_header(at).unwrap(); @@ -1074,7 +1042,6 @@ pub(crate) mod tests { backend, payload_provider, key_store: Some(keystore).into(), - known_peers, links, gossip_engine, gossip_validator,