Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Poll the substream validation before polling Notifications (#13934)
Browse files Browse the repository at this point in the history
* Poll the substream validation before polling `Notifications`

In tests, it can happen that `Notifications` doesn't produce any events
which causes `poll()` to return `Poll::Pending` and the substream
validation futures won't get polled.

Poll the futures before calling `Notifications` so results for substream
validations are received even if `Notifications` is not producing any
events.

* Remove `pending_messages`

* Remove unused import
  • Loading branch information
altonen authored Apr 18, 2023
1 parent 785115b commit af29c6f
Showing 1 changed file with 16 additions and 28 deletions.
44 changes: 16 additions & 28 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
future::Future,
iter,
pin::Pin,
Expand Down Expand Up @@ -77,8 +77,6 @@ type PendingSyncSubstreamValidation =

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome>,
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
/// Handles opening the unique substream and sending and receiving raw messages.
Expand Down Expand Up @@ -181,7 +179,6 @@ impl<B: BlockT> Protocol<B> {
};

let protocol = Self {
pending_messages: VecDeque::new(),
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
Expand Down Expand Up @@ -409,8 +406,21 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let event = match self.behaviour.poll(cx, params) {
Expand All @@ -430,23 +440,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};

while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
Expand Down Expand Up @@ -509,7 +502,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
) {
Ok(handshake) => {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);

let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
Expand Down Expand Up @@ -644,10 +636,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
}

if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}

// This block can only be reached if an event was pulled from the behaviour and that
// resulted in `CustomMessageOutcome::None`. Since there might be another pending
// message from the behaviour, the task is scheduled again.
Expand Down

0 comments on commit af29c6f

Please sign in to comment.