From 5ce16714b20420e72a8f7a314f7f2ce6c1c8cc5b Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 3 May 2024 11:50:42 +1000 Subject: [PATCH] Report RPC Errors to the application on peer disconnections (#5680) Squashed commit of the following: commit 02f1b2d53198fc1f0b1e7d31a1a7dd5fe45346eb Author: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri May 3 10:17:42 2024 +0900 Drop lookups after peer disconnect and not awaiting events commit f5dc1a3eb1cea408f8e061802f0ba8c95f9a3ff3 Author: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed May 1 17:14:50 2024 +0900 Expect RPCError::Disconnect to fail ongoing requests commit 888f129bd227fedc4a7896af1429d9b3232f21e2 Author: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed May 1 14:14:22 2024 +0900 Report RPC Errors to the application on peer disconnections Co-authored-by: Age Manning --- .../lighthouse_network/src/rpc/handler.rs | 25 +++++ .../lighthouse_network/src/service/mod.rs | 6 ++ .../lighthouse_network/tests/rpc_tests.rs | 92 ++++++++++++++++++- .../network/src/sync/backfill_sync/mod.rs | 38 +------- .../network/src/sync/block_lookups/mod.rs | 15 ++- .../sync/block_lookups/single_block_lookup.rs | 26 ++---- .../network/src/sync/block_lookups/tests.rs | 21 ++++- beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/range_sync/chain.rs | 26 +----- .../network/src/sync/range_sync/range.rs | 5 +- 10 files changed, 163 insertions(+), 95 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index df5bbba99c8..daf95fb8c91 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -352,6 +352,31 @@ where !matches!(self.state, HandlerState::Deactivated) } + // NOTE: This function gets polled to completion upon a connection close. + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + // Inform the network behaviour of any failed requests + + while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() { + let outbound_info = self + .outbound_substreams + .remove(&substream_id) + .expect("The value must exist for a key"); + // If the state of the connection is closing, we do not need to report this case to + // the behaviour, as the connection has just closed non-gracefully + if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) { + continue; + } + + // Register this request as an RPC Error + return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto: outbound_info.proto, + id: outbound_info.req_id, + }))); + } + Poll::Ready(None) + } + fn poll( &mut self, cx: &mut Context<'_>, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 1397c80fbd3..c83cc615549 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -972,6 +972,12 @@ impl Network { .goodbye_peer(peer_id, reason, source); } + /// Hard (ungraceful) disconnect for testing purposes only + /// Use goodbye_peer for disconnections, do not use this function. + pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) { + let _ = self.swarm.disconnect_peer_id(peer_id); + } + /// Returns an iterator over all enr entries in the DHT. pub fn enr_entries(&self) -> Vec { self.discovery().table_entries_enr() diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a60af4db3db..e2b72f86732 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -3,7 +3,7 @@ mod common; use common::Protocol; -use lighthouse_network::rpc::methods::*; +use lighthouse_network::rpc::{methods::*, RPCError}; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz::Encode; @@ -996,6 +996,96 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { }) } +#[test] +fn test_disconnect_triggers_rpc_error() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + let spec = E::default_spec(); + + let rt = Arc::new(Runtime::new().unwrap()); + // get sender/receiver + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; + + // BlocksByRoot Request + let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new( + // Must have at least one root for the request to create a stream + vec![Hash256::from_low_u64_be(0)], + &spec, + )); + + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + // Send a STATUS message + debug!(log, "Sending RPC"); + sender.send_request(peer_id, 42, rpc_request.clone()); + } + NetworkEvent::RPCFailed { error, id: 42, .. } => match error { + RPCError::Disconnected => return, + other => panic!("received unexpected error {:?}", other), + }, + other => { + warn!(log, "Ignoring other event {:?}", other); + } + } + } + }; + + // determine messages to send (PeerId, RequestId). If some, indicates we still need to send + // messages + let mut sending_peer = None; + let receiver_future = async { + loop { + // this future either drives the sending/receiving or times out allowing messages to be + // sent in the timeout + match futures::future::select( + Box::pin(receiver.next_event()), + Box::pin(tokio::time::sleep(Duration::from_secs(1))), + ) + .await + { + futures::future::Either::Left((ev, _)) => match ev { + NetworkEvent::RequestReceived { peer_id, .. } => { + sending_peer = Some(peer_id); + } + other => { + warn!(log, "Ignoring other event {:?}", other); + } + }, + futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required + } + + // if we need to send messages send them here. This will happen after a delay + if let Some(peer_id) = sending_peer.take() { + warn!(log, "Receiver got request, disconnecting peer"); + receiver.__hard_disconnect_testing_only(peer_id); + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +} + /// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC /// Goodbye message. fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 9075bb15f08..4be92d59a4b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -307,11 +307,7 @@ impl BackFillSync { /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { if matches!( self.state(), BackFillState::Failed | BackFillState::NotRequired @@ -319,37 +315,7 @@ impl BackFillSync { return Ok(()); } - if let Some(batch_ids) = self.active_requests.remove(peer_id) { - // fail the batches - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - match batch.download_failed(false) { - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(id))?; - } - Ok(BatchOperationOutcome::Continue) => {} - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; - } - } - // If we have run out of peers in which to retry this batch, the backfill state - // transitions to a paused state. - // We still need to reset the state for all the affected batches, so we should not - // short circuit early - if self.retry_batch_download(network, id).is_err() { - debug!( - self.log, - "Batch could not be retried"; - "batch_id" => id, - "error" => "no synced peers" - ); - } - } else { - debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => id) - } - } - } + self.active_requests.remove(peer_id); // Remove the peer from the participation list self.participating_peers.remove(peer_id); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 6852761d8bf..1b0c683c109 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -374,16 +374,13 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { - let should_drop_lookup = - req.should_drop_lookup_on_disconnected_peer(peer_id ); - - if should_drop_lookup { - debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); + self.single_block_lookups.retain(|_, lookup| { + if lookup.remove_peer(peer_id) { + debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root()); + false + } else { + true } - - !should_drop_lookup }); } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b642ec8e5b2..b3e5a8da098 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -191,21 +191,11 @@ impl SingleBlockLookup { && self.blob_request_state.state.is_processed() } - /// Checks both the block and blob request states to see if the peer is disconnected. - /// - /// Returns true if the lookup should be dropped. - pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool { - self.block_request_state.state.remove_peer(peer_id); - self.blob_request_state.state.remove_peer(peer_id); - - if self.all_available_peers().count() == 0 { - return true; - } - - // Note: if the peer disconnected happens to have an on-going request associated with this - // lookup we will receive an RPCError and the lookup will fail. No need to manually retry - // now. - false + /// Remove peer from available peers. Return true if there are no more available peers and all + /// requests are not expecting any future event (AwaitingDownload). + pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { + self.block_request_state.state.remove_peer(peer_id) + && self.blob_request_state.state.remove_peer(peer_id) } } @@ -469,9 +459,11 @@ impl SingleLookupRequestState { self.available_peers.insert(*peer_id); } - /// If a peer disconnects, this request could be failed. If so, an error is returned - pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) { + /// Remove peer from available peers. Return true if there are no more available peers and the + /// request is not expecting any future event (AwaitingDownload). + pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool { self.available_peers.remove(disconnected_peer_id); + self.available_peers.is_empty() && self.is_awaiting_download() } pub fn get_used_peers(&self) -> impl Iterator { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 302a0489c3b..75e0fc524f1 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -450,8 +450,25 @@ impl TestRig { }) } - fn peer_disconnected(&mut self, peer_id: PeerId) { - self.send_sync_message(SyncMessage::Disconnect(peer_id)); + fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) { + self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id)); + + // Return RPCErrors for all active requests of peer + self.drain_network_rx(); + while let Ok(request_id) = self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request_id: RequestId::Sync(id), + .. + } if *peer_id == disconnected_peer_id => Some(*id), + _ => None, + }) { + self.send_sync_message(SyncMessage::RpcError { + peer_id: disconnected_peer_id, + request_id, + error: RPCError::Disconnected, + }); + } } fn drain_network_rx(&mut self) { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0836d97c49f..2b72e3db125 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -366,9 +366,7 @@ impl SyncManager { self.range_sync.peer_disconnect(&mut self.network, peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. - let _ = self - .backfill_sync - .peer_disconnected(peer_id, &mut self.network); + let _ = self.backfill_sync.peer_disconnected(peer_id); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index c60cdb2cc9f..9a6c99ebf6c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -174,30 +174,8 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. - pub fn remove_peer( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> ProcessingResult { - if let Some(batch_ids) = self.peers.remove(peer_id) { - // fail the batches - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(true)? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: id, - }); - } - self.retry_batch_download(network, id)?; - } else { - debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => id) - } - } - } + pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { + self.peers.remove(peer_id); if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c8e82666840..fe48db35b45 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -278,9 +278,8 @@ where /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in self - .chains - .call_all(|chain| chain.remove_peer(peer_id, network)) + for (removed_chain, sync_type, remove_reason) in + self.chains.call_all(|chain| chain.remove_peer(peer_id)) { self.on_chain_removed( removed_chain,