diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index ae466a71abb57..735dea0170a62 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -25,11 +25,10 @@ use std::{ }; use codec::{Codec, Decode, Encode}; -use futures::{future, FutureExt, StreamExt}; +use futures::StreamExt; use log::{debug, error, info, log_enabled, trace, warn}; -use parking_lot::Mutex; -use sc_client_api::{Backend, FinalityNotification, FinalityNotifications}; +use sc_client_api::{Backend, FinalityNotification}; use sc_network_gossip::GossipEngine; use sp_api::{BlockId, ProvideRuntimeApi}; @@ -80,7 +79,7 @@ pub(crate) struct BeefyWorker { runtime: Arc, key_store: BeefyKeystore, signed_commitment_sender: BeefySignedCommitmentSender, - gossip_engine: Arc>>, + gossip_engine: GossipEngine, gossip_validator: Arc>, /// Min delta in block numbers between two blocks, BEEFY should vote on min_block_delta: u32, @@ -88,7 +87,6 @@ pub(crate) struct BeefyWorker { rounds: Option>, /// Buffer holding votes for blocks that the client hasn't seen finality for. pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, - finality_notifications: FinalityNotifications, /// Best block we received a GRANDPA notification for best_grandpa_block_header: ::Header, /// Best block a BEEFY voting round has been concluded for @@ -143,14 +141,13 @@ where runtime, key_store, signed_commitment_sender, - gossip_engine: Arc::new(Mutex::new(gossip_engine)), + gossip_engine, gossip_validator, // always target at least one block better than current best beefy min_block_delta: min_block_delta.max(1), metrics, rounds: None, pending_votes: BTreeMap::new(), - finality_notifications: client.finality_notification_stream(), best_grandpa_block_header: last_finalized_header, best_beefy_block: None, last_signed_id: 0, @@ -471,15 +468,21 @@ where true, ); - self.gossip_engine.lock().gossip_message(topic::(), encoded_message, false); + self.gossip_engine.gossip_message(topic::(), encoded_message, false); } /// Wait for BEEFY runtime pallet to be available. async fn wait_for_runtime_pallet(&mut self) { - self.client - .finality_notification_stream() - .take_while(|notif| { - let at = BlockId::hash(notif.header.hash()); + let mut gossip_engine = &mut self.gossip_engine; + let mut finality_stream = self.client.finality_notification_stream().fuse(); + loop { + futures::select! { + notif = finality_stream.next() => { + let notif = match notif { + Some(notif) => notif, + None => break + }; + let at = BlockId::hash(notif.header.hash()); if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() { if active.id() == GENESIS_AUTHORITY_SET_ID { // When starting from genesis, there is no session boundary digest. @@ -490,18 +493,18 @@ where // worker won't vote until it witnesses a session change. // Once we'll implement 'initial sync' (catch-up), the worker will be able to // start voting right away. - self.handle_finality_notification(notif); - future::ready(false) + self.handle_finality_notification(¬if); + break } else { trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available..."); - future::ready(true) } - }) - .for_each(|_| future::ready(())) - .await; - // get a new stream that provides _new_ notifications (from here on out) - self.finality_notifications = self.client.finality_notification_stream(); + }, + _ = gossip_engine => { + break + } + } + } } /// Main loop for BEEFY worker. @@ -512,16 +515,20 @@ where info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); self.wait_for_runtime_pallet().await; - let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map( - |notification| async move { - trace!(target: "beefy", "🥩 Got vote message: {:?}", notification); - - VoteMessage::, AuthorityId, Signature>::decode( - &mut ¬ification.message[..], - ) - .ok() - }, - )); + let mut finality_notifications = self.client.finality_notification_stream().fuse(); + let mut votes = Box::pin( + self.gossip_engine + .messages_for(topic::()) + .filter_map(|notification| async move { + trace!(target: "beefy", "🥩 Got vote message: {:?}", notification); + + VoteMessage::, AuthorityId, Signature>::decode( + &mut ¬ification.message[..], + ) + .ok() + }) + .fuse(), + ); loop { while self.sync_oracle.is_major_syncing() { @@ -529,18 +536,16 @@ where futures_timer::Delay::new(Duration::from_secs(5)).await; } - let engine = self.gossip_engine.clone(); - let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); - + let mut gossip_engine = &mut self.gossip_engine; futures::select! { - notification = self.finality_notifications.next().fuse() => { + notification = finality_notifications.next() => { if let Some(notification) = notification { self.handle_finality_notification(¬ification); } else { return; } }, - vote = votes.next().fuse() => { + vote = votes.next() => { if let Some(vote) = vote { let block_num = vote.commitment.block_number; if block_num > *self.best_grandpa_block_header.number() { @@ -563,7 +568,7 @@ where return; } }, - _ = gossip_engine.fuse() => { + _ = gossip_engine => { error!(target: "beefy", "🥩 Gossip engine has terminated."); return; } diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 2e09e7cc614a4..2d086e89b4a10 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -53,6 +53,8 @@ pub struct GossipEngine { message_sinks: HashMap>>, /// Buffered messages (see [`ForwardingState`]). forwarding_state: ForwardingState, + + is_terminated: bool, } /// A gossip engine receives messages from the network via the `network_event_stream` and forwards @@ -94,6 +96,8 @@ impl GossipEngine { network_event_stream, message_sinks: HashMap::new(), forwarding_state: ForwardingState::Idle, + + is_terminated: false, } } @@ -214,7 +218,10 @@ impl Future for GossipEngine { Event::Dht(_) => {}, }, // The network event stream closed. Do the same for [`GossipValidator`]. - Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(None) => { + self.is_terminated = true; + return Poll::Ready(()) + }, Poll::Pending => break, } }, @@ -288,6 +295,12 @@ impl Future for GossipEngine { } } +impl futures::future::FusedFuture for GossipEngine { + fn is_terminated(&self) -> bool { + self.is_terminated + } +} + #[cfg(test)] mod tests { use super::*;