Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Pump the gossip engine while waiting for the BEEFY runtime pallet (me…
Browse files Browse the repository at this point in the history
…mory leak fix) (#11694)

* Pump the gossip engine while waiting for the BEEFY runtime pallet

This fixes a memory leak when the BEEFY gadget is turned on, but
the runtime doesn't actually use BEEFY.

* Implement `FusedFuture` for `GossipEngine`

* Fuse futures outside of loops
  • Loading branch information
koute authored Jun 20, 2022
1 parent 6001b59 commit b66c051
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 37 deletions.
77 changes: 41 additions & 36 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ use std::{
};

use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
use futures::StreamExt;
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_client_api::{Backend, FinalityNotification};
use sc_network_gossip::GossipEngine;

use sp_api::{BlockId, ProvideRuntimeApi};
Expand Down Expand Up @@ -80,15 +79,14 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
runtime: Arc<R>,
key_store: BeefyKeystore,
signed_commitment_sender: BeefySignedCommitmentSender<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
/// Min delta in block numbers between two blocks, BEEFY should vote on
min_block_delta: u32,
metrics: Option<Metrics>,
rounds: Option<Rounds<Payload, B>>,
/// Buffer holding votes for blocks that the client hasn't seen finality for.
pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>,
finality_notifications: FinalityNotifications<B>,
/// Best block we received a GRANDPA notification for
best_grandpa_block_header: <B as Block>::Header,
/// Best block a BEEFY voting round has been concluded for
Expand Down Expand Up @@ -143,14 +141,13 @@ where
runtime,
key_store,
signed_commitment_sender,
gossip_engine: Arc::new(Mutex::new(gossip_engine)),
gossip_engine,
gossip_validator,
// always target at least one block better than current best beefy
min_block_delta: min_block_delta.max(1),
metrics,
rounds: None,
pending_votes: BTreeMap::new(),
finality_notifications: client.finality_notification_stream(),
best_grandpa_block_header: last_finalized_header,
best_beefy_block: None,
last_signed_id: 0,
Expand Down Expand Up @@ -471,15 +468,21 @@ where
true,
);

self.gossip_engine.lock().gossip_message(topic::<B>(), encoded_message, false);
self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
}

/// Wait for BEEFY runtime pallet to be available.
async fn wait_for_runtime_pallet(&mut self) {
self.client
.finality_notification_stream()
.take_while(|notif| {
let at = BlockId::hash(notif.header.hash());
let mut gossip_engine = &mut self.gossip_engine;
let mut finality_stream = self.client.finality_notification_stream().fuse();
loop {
futures::select! {
notif = finality_stream.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() {
if active.id() == GENESIS_AUTHORITY_SET_ID {
// When starting from genesis, there is no session boundary digest.
Expand All @@ -490,18 +493,18 @@ where
// worker won't vote until it witnesses a session change.
// Once we'll implement 'initial sync' (catch-up), the worker will be able to
// start voting right away.
self.handle_finality_notification(notif);
future::ready(false)
self.handle_finality_notification(&notif);
break
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
future::ready(true)
}
})
.for_each(|_| future::ready(()))
.await;
// get a new stream that provides _new_ notifications (from here on out)
self.finality_notifications = self.client.finality_notification_stream();
},
_ = gossip_engine => {
break
}
}
}
}

/// Main loop for BEEFY worker.
Expand All @@ -512,35 +515,37 @@ where
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number());
self.wait_for_runtime_pallet().await;

let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::<B>()).filter_map(
|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok()
},
));
let mut finality_notifications = self.client.finality_notification_stream().fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
.filter_map(|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok()
})
.fuse(),
);

loop {
while self.sync_oracle.is_major_syncing() {
debug!(target: "beefy", "Waiting for major sync to complete...");
futures_timer::Delay::new(Duration::from_secs(5)).await;
}

let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));

let mut gossip_engine = &mut self.gossip_engine;
futures::select! {
notification = self.finality_notifications.next().fuse() => {
notification = finality_notifications.next() => {
if let Some(notification) = notification {
self.handle_finality_notification(&notification);
} else {
return;
}
},
vote = votes.next().fuse() => {
vote = votes.next() => {
if let Some(vote) = vote {
let block_num = vote.commitment.block_number;
if block_num > *self.best_grandpa_block_header.number() {
Expand All @@ -563,7 +568,7 @@ where
return;
}
},
_ = gossip_engine.fuse() => {
_ = gossip_engine => {
error!(target: "beefy", "🥩 Gossip engine has terminated.");
return;
}
Expand Down
15 changes: 14 additions & 1 deletion client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct GossipEngine<B: BlockT> {
message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
/// Buffered messages (see [`ForwardingState`]).
forwarding_state: ForwardingState<B>,

is_terminated: bool,
}

/// A gossip engine receives messages from the network via the `network_event_stream` and forwards
Expand Down Expand Up @@ -94,6 +96,8 @@ impl<B: BlockT> GossipEngine<B> {
network_event_stream,
message_sinks: HashMap::new(),
forwarding_state: ForwardingState::Idle,

is_terminated: false,
}
}

Expand Down Expand Up @@ -214,7 +218,10 @@ impl<B: BlockT> Future for GossipEngine<B> {
Event::Dht(_) => {},
},
// The network event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(None) => {
self.is_terminated = true;
return Poll::Ready(())
},
Poll::Pending => break,
}
},
Expand Down Expand Up @@ -288,6 +295,12 @@ impl<B: BlockT> Future for GossipEngine<B> {
}
}

impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
fn is_terminated(&self) -> bool {
self.is_terminated
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit b66c051

Please sign in to comment.