Skip to content

Commit

Permalink
Pump the gossip engine while waiting for the BEEFY runtime pallet (me…
Browse files Browse the repository at this point in the history
…mory leak fix) (paritytech#11694)

* Pump the gossip engine while waiting for the BEEFY runtime pallet

This fixes a memory leak when the BEEFY gadget is turned on, but
the runtime doesn't actually use BEEFY.

* Implement `FusedFuture` for `GossipEngine`

* Fuse futures outside of loops
  • Loading branch information
koute authored and ark0f committed Feb 27, 2023
1 parent 2231ee1 commit ab6cecb
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 37 deletions.
77 changes: 41 additions & 36 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ use std::{
};

use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
use futures::StreamExt;
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_client_api::{Backend, FinalityNotification};
use sc_network_gossip::GossipEngine;

use sp_api::{BlockId, ProvideRuntimeApi};
Expand Down Expand Up @@ -80,15 +79,14 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
runtime: Arc<R>,
key_store: BeefyKeystore,
signed_commitment_sender: BeefySignedCommitmentSender<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
gossip_engine: GossipEngine<B>,
gossip_validator: Arc<GossipValidator<B>>,
/// Min delta in block numbers between two blocks, BEEFY should vote on
min_block_delta: u32,
metrics: Option<Metrics>,
rounds: Option<Rounds<Payload, B>>,
/// Buffer holding votes for blocks that the client hasn't seen finality for.
pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>,
finality_notifications: FinalityNotifications<B>,
/// Best block we received a GRANDPA notification for
best_grandpa_block_header: <B as Block>::Header,
/// Best block a BEEFY voting round has been concluded for
Expand Down Expand Up @@ -143,14 +141,13 @@ where
runtime,
key_store,
signed_commitment_sender,
gossip_engine: Arc::new(Mutex::new(gossip_engine)),
gossip_engine,
gossip_validator,
// always target at least one block better than current best beefy
min_block_delta: min_block_delta.max(1),
metrics,
rounds: None,
pending_votes: BTreeMap::new(),
finality_notifications: client.finality_notification_stream(),
best_grandpa_block_header: last_finalized_header,
best_beefy_block: None,
last_signed_id: 0,
Expand Down Expand Up @@ -471,15 +468,21 @@ where
true,
);

self.gossip_engine.lock().gossip_message(topic::<B>(), encoded_message, false);
self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
}

/// Wait for BEEFY runtime pallet to be available.
async fn wait_for_runtime_pallet(&mut self) {
self.client
.finality_notification_stream()
.take_while(|notif| {
let at = BlockId::hash(notif.header.hash());
let mut gossip_engine = &mut self.gossip_engine;
let mut finality_stream = self.client.finality_notification_stream().fuse();
loop {
futures::select! {
notif = finality_stream.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() {
if active.id() == GENESIS_AUTHORITY_SET_ID {
// When starting from genesis, there is no session boundary digest.
Expand All @@ -490,18 +493,18 @@ where
// worker won't vote until it witnesses a session change.
// Once we'll implement 'initial sync' (catch-up), the worker will be able to
// start voting right away.
self.handle_finality_notification(notif);
future::ready(false)
self.handle_finality_notification(&notif);
break
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
future::ready(true)
}
})
.for_each(|_| future::ready(()))
.await;
// get a new stream that provides _new_ notifications (from here on out)
self.finality_notifications = self.client.finality_notification_stream();
},
_ = gossip_engine => {
break
}
}
}
}

/// Main loop for BEEFY worker.
Expand All @@ -512,35 +515,37 @@ where
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number());
self.wait_for_runtime_pallet().await;

let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::<B>()).filter_map(
|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok()
},
));
let mut finality_notifications = self.client.finality_notification_stream().fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
.filter_map(|notification| async move {
trace!(target: "beefy", "🥩 Got vote message: {:?}", notification);

VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
&mut &notification.message[..],
)
.ok()
})
.fuse(),
);

loop {
while self.sync_oracle.is_major_syncing() {
debug!(target: "beefy", "Waiting for major sync to complete...");
futures_timer::Delay::new(Duration::from_secs(5)).await;
}

let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));

let mut gossip_engine = &mut self.gossip_engine;
futures::select! {
notification = self.finality_notifications.next().fuse() => {
notification = finality_notifications.next() => {
if let Some(notification) = notification {
self.handle_finality_notification(&notification);
} else {
return;
}
},
vote = votes.next().fuse() => {
vote = votes.next() => {
if let Some(vote) = vote {
let block_num = vote.commitment.block_number;
if block_num > *self.best_grandpa_block_header.number() {
Expand All @@ -563,7 +568,7 @@ where
return;
}
},
_ = gossip_engine.fuse() => {
_ = gossip_engine => {
error!(target: "beefy", "🥩 Gossip engine has terminated.");
return;
}
Expand Down
15 changes: 14 additions & 1 deletion client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct GossipEngine<B: BlockT> {
message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
/// Buffered messages (see [`ForwardingState`]).
forwarding_state: ForwardingState<B>,

is_terminated: bool,
}

/// A gossip engine receives messages from the network via the `network_event_stream` and forwards
Expand Down Expand Up @@ -94,6 +96,8 @@ impl<B: BlockT> GossipEngine<B> {
network_event_stream,
message_sinks: HashMap::new(),
forwarding_state: ForwardingState::Idle,

is_terminated: false,
}
}

Expand Down Expand Up @@ -214,7 +218,10 @@ impl<B: BlockT> Future for GossipEngine<B> {
Event::Dht(_) => {},
},
// The network event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(None) => {
self.is_terminated = true;
return Poll::Ready(())
},
Poll::Pending => break,
}
},
Expand Down Expand Up @@ -288,6 +295,12 @@ impl<B: BlockT> Future for GossipEngine<B> {
}
}

impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
fn is_terminated(&self) -> bool {
self.is_terminated
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit ab6cecb

Please sign in to comment.