Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all batches related to a peer on disconnect #5969

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
92a6e77
Remove all batches related to a peer on disconnect
pawanjay176 Jun 20, 2024
9d90e39
Cleanup map entries after disconnect
pawanjay176 Jun 21, 2024
70af7d1
Allow lookups to continue in case of disconnections
pawanjay176 Jun 21, 2024
312be2c
Pretty response types
pawanjay176 Jun 21, 2024
a38caf4
fmt
pawanjay176 Jun 21, 2024
c7fd21d
Fix lints
pawanjay176 Jun 24, 2024
a8f64f2
Remove lookup if it cannot progress
pawanjay176 Jun 24, 2024
bc10fb2
Fix tests
pawanjay176 Jun 24, 2024
cd17f9f
Remove poll_close on rpc behaviour
pawanjay176 Jun 24, 2024
6ead176
Remove redundant test
pawanjay176 Jun 24, 2024
3e1c41a
Fix issue raised by lion
pawanjay176 Jun 24, 2024
dfaf238
Revert pretty response types
pawanjay176 Jun 24, 2024
b930c7c
Cleanup
pawanjay176 Jun 24, 2024
62167fb
Fix test
pawanjay176 Jun 24, 2024
0c8efc1
Merge remote-tracking branch 'origin/release-v5.2.1' into rpc-error-o…
michaelsproul Jun 25, 2024
26614f1
Apply suggestions from joao
pawanjay176 Jun 25, 2024
ec90bf0
Fix log
pawanjay176 Jun 25, 2024
e79c71a
update request status on no peers found
pawanjay176 Jun 25, 2024
caee7c8
Do not remove lookup after peer disconnection
pawanjay176 Jun 25, 2024
804f36d
Add comments about expected event api
dapplion Jun 25, 2024
9b2e9e0
Update single_block_lookup.rs
dapplion Jun 25, 2024
cd68550
Update mod.rs
dapplion Jun 25, 2024
5b0fbd9
Merge branch 'rpc-error-on-disconnect-revert' into 5969-review
dapplion Jun 25, 2024
59468f8
Merge pull request #10 from dapplion/5969-review
pawanjay176 Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,37 +352,6 @@ where
!matches!(self.state, HandlerState::Deactivated)
}

// NOTE: This function gets polled to completion upon a connection close.
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
// Inform the network behaviour of any failed requests

while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
let outbound_info = self
.outbound_substreams
.remove(&substream_id)
.expect("The value must exist for a key");
// If the state of the connection is closing, we do not need to report this case to
// the behaviour, as the connection has just closed non-gracefully
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
continue;
}

// Register this request as an RPC Error
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: outbound_info.proto,
id: outbound_info.req_id,
})));
}

// Also handle any events that are awaiting to be sent to the behaviour
if !self.events_out.is_empty() {
return Poll::Ready(Some(self.events_out.remove(0)));
}

Poll::Ready(None)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
94 changes: 1 addition & 93 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod common;

use common::Protocol;
use lighthouse_network::rpc::{methods::*, RPCError};
use lighthouse_network::rpc::methods::*;
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
use slog::{debug, warn, Level};
use ssz::Encode;
Expand Down Expand Up @@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
})
}

#[test]
fn test_disconnect_triggers_rpc_error() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();

let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
// Must have at least one root for the request to create a stream
vec![Hash256::from_low_u64_be(0)],
&spec,
));

// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 42, rpc_request.clone())
.unwrap();
}
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
RPCError::Disconnected => return,
other => panic!("received unexpected error {:?}", other),
},
other => {
warn!(log, "Ignoring other event {:?}", other);
}
}
}
};

// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut sending_peer = None;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
)
.await
{
futures::future::Either::Left((ev, _)) => match ev {
NetworkEvent::RequestReceived { peer_id, .. } => {
sending_peer = Some(peer_id);
}
other => {
warn!(log, "Ignoring other event {:?}", other);
}
},
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
}

// if we need to send messages send them here. This will happen after a delay
if let Some(peer_id) = sending_peer.take() {
warn!(log, "Receiver got request, disconnecting peer");
receiver.__hard_disconnect_testing_only(peer_id);
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}

/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
/// Goodbye message.
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
Expand Down
38 changes: 36 additions & 2 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,49 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
) {
return Ok(());
}

self.active_requests.remove(peer_id);
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
// fail the batches
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}

// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Expand Down
18 changes: 5 additions & 13 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,21 +410,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
/* Check disconnection for single lookups */
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
self.single_block_lookups.retain(|_, lookup| {
lookup.remove_peer(peer_id);

// Note: this condition should be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
if lookup.has_no_peers() {
debug!(self.log,
"Dropping single lookup after peer disconnection";
"block_root" => ?lookup.block_root()
);
false
} else {
true
}
});
// Keep the lookup if it was some peers that can drive progress
// or if it has some downloaded components that can be processed
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
!lookup.has_no_peers() || lookup.can_progress_without_peer()
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
})
}

/* Processing responses */
Expand Down
22 changes: 18 additions & 4 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,15 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.peers.insert(peer_id)
}

/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.peers.remove(peer_id)
/// Remove peer from available peers.
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
}

/// Returns true if a lookup has some downloaded components that can be processed.
pub fn can_progress_without_peer(&self) -> bool {
self.block_request_state.state.can_progress_without_peer()
|| self.blob_request_state.state.can_progress_without_peer()
}

/// Returns true if this lookup has zero peers
Expand Down Expand Up @@ -349,6 +354,15 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

pub fn can_progress_without_peer(&self) -> bool {
match self.state {
State::AwaitingDownload { .. } | State::Downloading { .. } => false,
State::AwaitingProcess { .. } | State::Processing { .. } | State::Processed { .. } => {
true
}
}
}

pub fn is_processed(&self) -> bool {
match self.state {
State::AwaitingDownload { .. }
Expand Down
47 changes: 30 additions & 17 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,6 @@ impl TestRig {
})
}

fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
}

/// Return RPCErrors for all active requests of peer
fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) {
self.drain_network_rx();
Expand All @@ -562,6 +558,10 @@ impl TestRig {
}
}

fn peer_disconnected(&mut self, peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(peer_id));
}

fn drain_network_rx(&mut self) {
while let Ok(event) = self.network_rx.try_recv() {
self.network_rx_queue.push(event);
Expand Down Expand Up @@ -1026,6 +1026,28 @@ fn test_single_block_lookup_failure() {
rig.expect_empty_network();
}

#[test]
fn test_single_block_lookup_peer_disconnected_then_rpc_error() {
let mut rig = TestRig::test_setup();

let block_hash = Hash256::random();
let peer_id = rig.new_connected_peer();

// Trigger the request
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
rig.trigger_unknown_block_from_attestation(block_hash, peer_id);
let id = rig.expect_block_lookup_request(block_hash);

// The peer disconnect event reaches sync before the rpc error
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
rig.peer_disconnected(peer_id);
// Only peer and no progress on lookup, so lookup should be removed
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
rig.expect_no_active_lookups();
// The request fails
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
rig.single_lookup_failed(id, peer_id, RPCError::Disconnected);
rig.expect_block_lookup_request(block_hash);
// The request should be removed from the network context on disconnection
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
rig.expect_empty_network();
}

#[test]
fn test_single_block_lookup_becomes_parent_request() {
let mut rig = TestRig::test_setup();
Expand Down Expand Up @@ -1289,19 +1311,10 @@ fn test_lookup_peer_disconnected_no_peers_left_while_request() {
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
rig.rpc_error_all_active_requests(peer_id);
rig.expect_no_active_lookups();
}

#[test]
fn test_lookup_peer_disconnected_no_peers_left_not_while_request() {
let mut rig = TestRig::test_setup();
let peer_id = rig.new_connected_peer();
let trigger_block = rig.rand_block();
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
// Note: this test case may be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
rig.expect_no_active_lookups();
// Erroring all rpc requests and disconnecting the peer shouldn't remove the active
// request as we can still progress.
// The parent lookup trigger in this case can still be progressed so it shouldn't be removed.
rig.assert_single_lookups_count(1);
}

#[test]
Expand Down
14 changes: 11 additions & 3 deletions beacon_node/network/src/sync/block_sidecar_coupling.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use beacon_chain::block_verification_types::RpcBlock;
use lighthouse_network::PeerId;
use ssz_types::VariableList;
use std::{collections::VecDeque, sync::Arc};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
Expand All @@ -17,16 +18,19 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
is_sidecars_stream_terminated: bool,
/// Used to determine if this accumulator should wait for a sidecars stream termination
request_type: ByRangeRequestType,
/// The peer the request was made to.
pub(crate) peer_id: PeerId,
}

impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
pub fn new(request_type: ByRangeRequestType) -> Self {
pub fn new(request_type: ByRangeRequestType, peer_id: PeerId) -> Self {
Self {
accumulated_blocks: <_>::default(),
accumulated_sidecars: <_>::default(),
is_blocks_stream_terminated: <_>::default(),
is_sidecars_stream_terminated: <_>::default(),
request_type,
peer_id,
}
}

Expand Down Expand Up @@ -109,12 +113,14 @@ mod tests {
use super::BlocksAndBlobsRequestInfo;
use crate::sync::range_sync::ByRangeRequestType;
use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs};
use lighthouse_network::PeerId;
use rand::SeedableRng;
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};

#[test]
fn no_blobs_into_responses() {
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks);
let peer_id = PeerId::random();
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::Blocks, peer_id);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
Expand All @@ -133,7 +139,9 @@ mod tests {

#[test]
fn empty_blobs_into_responses() {
let mut info = BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::BlocksAndBlobs);
let peer_id = PeerId::random();
let mut info =
BlocksAndBlobsRequestInfo::<E>::new(ByRangeRequestType::BlocksAndBlobs, peer_id);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {
Expand Down
Loading