Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop gossipsub stale messages when polling ConnectionHandler. #5175

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions beacon_node/lighthouse_network/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ impl EnabledHandler {
});
}

// We may need to inform the behviour if we have a dropped a message. This gets set if that
// is the case.
let mut dropped_message = None;

// process outbound stream
loop {
match std::mem::replace(
Expand All @@ -271,10 +267,11 @@ impl EnabledHandler {
} => {
if Pin::new(timeout).poll(cx).is_ready() {
// Inform the behaviour and end the poll.
dropped_message = Some(HandlerEvent::MessageDropped(message));
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(message),
));
}
}
_ => {} // All other messages are not time-bound.
Expand Down Expand Up @@ -348,13 +345,7 @@ impl EnabledHandler {
}
}

// If there was a timeout in sending a message, inform the behaviour before restarting the
// poll
if let Some(handler_event) = dropped_message {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event));
}

// Handle inbound messages
// Handle inbound messages.
loop {
match std::mem::replace(
&mut self.inbound_substream,
Expand Down Expand Up @@ -419,6 +410,32 @@ impl EnabledHandler {
}
}

// Drop the next message in queue if it's stale.
let mut peakable = self.send_queue.clone().peekable();
if let Poll::Ready(Some(mut message)) = peakable.poll_next_unpin(cx) {
match message {
RpcOut::Publish {
message: _,
ref mut timeout,
}
| RpcOut::Forward {
message: _,
ref mut timeout,
} => {
if Pin::new(timeout).poll(cx).is_ready() {
// Drop the message.
let dropped = futures::ready!(self.send_queue.poll_next_unpin(cx))
.expect("There should be a message");
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(dropped),
));
}
}
// the next message in queue is not time bound.
_ => {}
}
}

Poll::Pending
}
}
Expand Down
Loading