Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Pump the gossip engine while waiting for the BEEFY runtime pallet (memory leak fix) #11694

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 41 additions & 36 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ use std::{
};

use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
use futures::StreamExt;
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_client_api::{Backend, FinalityNotification};
use sc_network_gossip::GossipEngine;

use sp_api::{BlockId, ProvideRuntimeApi};
Expand Down Expand Up @@ -80,15 +79,14 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
runtime: Arc<R>,
key_store: BeefyKeystore,
signed_commitment_sender: BeefySignedCommitmentSender<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
/// Min delta in block numbers between two blocks, BEEFY should vote on
min_block_delta: u32,
metrics: Option<Metrics>,
rounds: Option<Rounds<Payload, B>>,
/// Buffer holding votes for blocks that the client hasn't seen finality for.
pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>,
finality_notifications: FinalityNotifications<B>,
/// Best block we received a GRANDPA notification for
best_grandpa_block_header: <B as Block>::Header,
/// Best block a BEEFY voting round has been concluded for
Expand Down Expand Up @@ -143,14 +141,13 @@ where
runtime,
key_store,
signed_commitment_sender,
gossip_engine: Arc::new(Mutex::new(gossip_engine)),
gossip_engine,
gossip_validator,
// always target at least one block better than current best beefy
min_block_delta: min_block_delta.max(1),
metrics,
rounds: None,
pending_votes: BTreeMap::new(),
finality_notifications: client.finality_notification_stream(),
best_grandpa_block_header: last_finalized_header,
best_beefy_block: None,
last_signed_id: 0,
Expand Down Expand Up @@ -471,15 +468,21 @@ where
true,
);

self.gossip_engine.lock().gossip_message(topic::<B>(), encoded_message, false);
self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
}

/// Wait for BEEFY runtime pallet to be available.
async fn wait_for_runtime_pallet(&mut self) {
self.client
.finality_notification_stream()
.take_while(|notif| {
let at = BlockId::hash(notif.header.hash());
let mut gossip_engine = &mut self.gossip_engine;
let mut finality_stream = self.client.finality_notification_stream().fuse();
loop {
futures::select! {
notif = finality_stream.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() {
if active.id() == GENESIS_AUTHORITY_SET_ID {
// When starting from genesis, there is no session boundary digest.
Expand All @@ -490,18 +493,18 @@ where
// worker won't vote until it witnesses a session change.
// Once we'll implement 'initial sync' (catch-up), the worker will be able to
// start voting right away.
self.handle_finality_notification(notif);
future::ready(false)
self.handle_finality_notification(&notif);
break
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
future::ready(true)
}
})
.for_each(|_| future::ready(()))
.await;
// get a new stream that provides _new_ notifications (from here on out)
self.finality_notifications = self.client.finality_notification_stream();
},
_ = gossip_engine => {
break
}
}
}
}

/// Main loop for BEEFY worker.
Expand All @@ -512,35 +515,37 @@ where
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number());
self.wait_for_runtime_pallet().await;

let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::<B>()).filter_map(
|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok()
},
));
let mut finality_notifications = self.client.finality_notification_stream().fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
.filter_map(|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok()
})
.fuse(),
);

loop {
while self.sync_oracle.is_major_syncing() {
debug!(target: "beefy", "Waiting for major sync to complete...");
futures_timer::Delay::new(Duration::from_secs(5)).await;
}

let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));

let mut gossip_engine = &mut self.gossip_engine;
futures::select! {
notification = self.finality_notifications.next().fuse() => {
notification = finality_notifications.next() => {
if let Some(notification) = notification {
self.handle_finality_notification(&notification);
} else {
return;
}
},
vote = votes.next().fuse() => {
vote = votes.next() => {
if let Some(vote) = vote {
let block_num = vote.commitment.block_number;
if block_num > *self.best_grandpa_block_header.number() {
Expand All @@ -563,7 +568,7 @@ where
return;
}
},
_ = gossip_engine.fuse() => {
_ = gossip_engine => {
error!(target: "beefy", "🥩 Gossip engine has terminated.");
return;
}
Expand Down
15 changes: 14 additions & 1 deletion client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct GossipEngine<B: BlockT> {
message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
/// Buffered messages (see [`ForwardingState`]).
forwarding_state: ForwardingState<B>,

is_terminated: bool,
}

/// A gossip engine receives messages from the network via the `network_event_stream` and forwards
Expand Down Expand Up @@ -94,6 +96,8 @@ impl<B: BlockT> GossipEngine<B> {
network_event_stream,
message_sinks: HashMap::new(),
forwarding_state: ForwardingState::Idle,

is_terminated: false,
}
}

Expand Down Expand Up @@ -214,7 +218,10 @@ impl<B: BlockT> Future for GossipEngine<B> {
Event::Dht(_) => {},
},
// The network event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(None) => {
self.is_terminated = true;
return Poll::Ready(())
},
Poll::Pending => break,
}
},
Expand Down Expand Up @@ -288,6 +295,12 @@ impl<B: BlockT> Future for GossipEngine<B> {
}
}

impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
fn is_terminated(&self) -> bool {
self.is_terminated
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down