From 92a6e77681eba2a889fed29a2e7ecd52613fc8f4 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 20 Jun 2024 12:50:19 +0530 Subject: [PATCH 01/21] Remove all batches related to a peer on disconnect --- .../network/src/sync/backfill_sync/mod.rs | 38 ++++++++++++++++++- .../network/src/sync/block_lookups/mod.rs | 20 ++++------ .../sync/block_lookups/single_block_lookup.rs | 8 ++++ .../network/src/sync/block_lookups/tests.rs | 2 + beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/range_sync/chain.rs | 26 ++++++++++++- .../network/src/sync/range_sync/range.rs | 5 ++- 7 files changed, 84 insertions(+), 19 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ce7d04ac0ac..29d209bbe38 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -307,7 +307,11 @@ impl BackFillSync { /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { + pub fn peer_disconnected( + &mut self, + peer_id: &PeerId, + network: &mut SyncNetworkContext, + ) -> Result<(), BackFillError> { if matches!( self.state(), BackFillState::Failed | BackFillState::NotRequired @@ -315,7 +319,37 @@ impl BackFillSync { return Ok(()); } - self.active_requests.remove(peer_id); + if let Some(batch_ids) = self.active_requests.remove(peer_id) { + // fail the batches + for id in batch_ids { + if let Some(batch) = self.batches.get_mut(&id) { + match batch.download_failed(false) { + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(id))?; + } + Ok(BatchOperationOutcome::Continue) => {} + Err(e) => { + self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; + } + } + // If we have run out of peers in which to retry this batch, the backfill state + // transitions to a paused state. + // We still need to reset the state for all the affected batches, so we should not + // short circuit early + if self.retry_batch_download(network, id).is_err() { + debug!( + self.log, + "Batch could not be retried"; + "batch_id" => id, + "error" => "no synced peers" + ); + } + } else { + debug!(self.log, "Batch not found while removing peer"; + "peer" => %peer_id, "batch" => id) + } + } + } // Remove the peer from the participation list self.participating_peers.remove(peer_id); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f685b7e59db..a47c9f3840e 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -410,20 +410,16 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - self.single_block_lookups.retain(|_, lookup| { - lookup.remove_peer(peer_id); + /* Check disconnection for single lookups */ + self.single_block_lookups.retain(|_, req| { + let should_drop_lookup = + req.should_drop_lookup_on_disconnected_peer(peer_id ); - // Note: this condition should be removed in the future. It's not strictly necessary to drop a - // lookup if there are no peers left. Lookup should only be dropped if it can not make progress - if lookup.has_no_peers() { - debug!(self.log, - "Dropping single lookup after peer disconnection"; - "block_root" => ?lookup.block_root() - ); - false - } else { - true + if should_drop_lookup { + debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); } + + !should_drop_lookup }); } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 13efd36ab7e..209dd07a1f4 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -261,6 +261,14 @@ impl SingleBlockLookup { fn use_rand_available_peer(&mut self) -> Option { self.peers.iter().choose(&mut rand::thread_rng()).copied() } + /// Checks if the peer is disconnected. + /// + /// Returns true if the lookup should be dropped. + pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool { + self.remove_peer(peer_id); + + return self.has_no_peers(); + } } /// The state of the blob request component of a `SingleBlockLookup`. diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index a607151bde9..fd381dd23c4 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -560,6 +560,8 @@ impl TestRig { error: RPCError::Disconnected, }); } + fn peer_disconnected(&mut self, peer_id: PeerId) { + self.send_sync_message(SyncMessage::Disconnect(peer_id)); } fn drain_network_rx(&mut self) { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4c1a1e6b67a..5dc419f8a55 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -381,7 +381,9 @@ impl SyncManager { self.range_sync.peer_disconnect(&mut self.network, peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. - let _ = self.backfill_sync.peer_disconnected(peer_id); + let _ = self + .backfill_sync + .peer_disconnected(peer_id, &mut self.network); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 63cafa9aca4..a735b0f6adb 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -174,8 +174,30 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. - pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { - self.peers.remove(peer_id); + pub fn remove_peer( + &mut self, + peer_id: &PeerId, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { + if let Some(batch_ids) = self.peers.remove(peer_id) { + // fail the batches + for id in batch_ids { + if let Some(batch) = self.batches.get_mut(&id) { + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(true)? + { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: id, + }); + } + self.retry_batch_download(network, id)?; + } else { + debug!(self.log, "Batch not found while removing peer"; + "peer" => %peer_id, "batch" => id) + } + } + } if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index fe48db35b45..c8e82666840 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -278,8 +278,9 @@ where /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in - self.chains.call_all(|chain| chain.remove_peer(peer_id)) + for (removed_chain, sync_type, remove_reason) in self + .chains + .call_all(|chain| chain.remove_peer(peer_id, network)) { self.on_chain_removed( removed_chain, From 9d90e39fab581250e732b13724e111f77de10c10 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 21 Jun 2024 17:34:01 +0530 Subject: [PATCH 02/21] Cleanup map entries after disconnect --- .../network/src/sync/block_lookups_review.md | 0 .../src/sync/block_sidecar_coupling.rs | 6 +++- beacon_node/network/src/sync/manager.rs | 33 ++++++++++++++++++- .../network/src/sync/network_context.rs | 23 ++++++++++--- .../src/sync/network_context/requests.rs | 13 ++++++-- 5 files changed, 66 insertions(+), 9 deletions(-) create mode 100644 beacon_node/network/src/sync/block_lookups_review.md diff --git a/beacon_node/network/src/sync/block_lookups_review.md b/beacon_node/network/src/sync/block_lookups_review.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index d159733cbc7..7427f70e69d 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,4 +1,5 @@ use beacon_chain::block_verification_types::RpcBlock; +use lighthouse_network::PeerId; use ssz_types::VariableList; use std::{collections::VecDeque, sync::Arc}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; @@ -17,16 +18,19 @@ pub struct BlocksAndBlobsRequestInfo { is_sidecars_stream_terminated: bool, /// Used to determine if this accumulator should wait for a sidecars stream termination request_type: ByRangeRequestType, + /// TODO(pawan): would this be multiple peer ids? + pub(crate) peer_id: PeerId, } impl BlocksAndBlobsRequestInfo { - pub fn new(request_type: ByRangeRequestType) -> Self { + pub fn new(request_type: ByRangeRequestType, peer_id: PeerId) -> Self { Self { accumulated_blocks: <_>::default(), accumulated_sidecars: <_>::default(), is_blocks_stream_terminated: <_>::default(), is_sidecars_stream_terminated: <_>::default(), request_type, + peer_id, } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5dc419f8a55..08460067726 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -372,11 +372,24 @@ impl SyncManager { Err(_) => self.update_sync_state(), }, } + } else { + debug!( + self.log, + "RPC error for range request has no associated entry in network context, ungraceful disconnect"; + "peer_id" => %peer_id, + "request_id" => %id, + "error" => ?error, + ); } } } } + /// Handles a peer disconnect. + /// + /// It is important that a peer disconnect retries all the batches/lookups as + /// there is no way to guarantee that libp2p always emits a error along with + /// the disconnect. fn peer_disconnect(&mut self, peer_id: &PeerId) { self.range_sync.peer_disconnect(&mut self.network, peer_id); self.block_lookups.peer_disconnected(peer_id); @@ -384,6 +397,10 @@ impl SyncManager { let _ = self .backfill_sync .peer_disconnected(peer_id, &mut self.network); + // Note: the order is important here, we should only remove the requests + // from network context's maps after we have initiated the retries for + // the associated batches/lookups + self.network.peer_disconnected(peer_id.clone()); self.update_sync_state(); } @@ -851,6 +868,13 @@ impl SyncManager { resp, &mut self.network, ) + } else { + debug!( + self.log, + "RPC error for block lookup has no associated entry in network context, ungraceful disconnect"; + "peer_id" => %peer_id, + "request_id" => ?id, + ); } } @@ -893,6 +917,13 @@ impl SyncManager { resp, &mut self.network, ) + } else { + debug!( + self.log, + "RPC error for blob lookup has no associated entry in network context, ungraceful disconnect"; + "peer_id" => %peer_id, + "request_id" => ?id, + ); } } @@ -953,7 +984,7 @@ impl SyncManager { self.network.insert_range_blocks_and_blobs_request( id, resp.sender_id, - BlocksAndBlobsRequestInfo::new(resp.request_type), + BlocksAndBlobsRequestInfo::new(resp.request_type, peer_id), ); // inform range that the request needs to be treated as failed // With time we will want to downgrade this log diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index f3f82ee011f..6c8286f2818 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -177,6 +177,16 @@ impl SyncNetworkContext { } } + /// Remove all active requests associated with the peer. + pub fn peer_disconnected(&mut self, peer_id: PeerId) { + self.blocks_by_root_requests + .retain(|_, request| request.peer_id != peer_id); + self.blobs_by_root_requests + .retain(|_, request| request.peer_id != peer_id); + self.range_blocks_and_blobs_requests + .retain(|_, request| request.1.peer_id != peer_id); + } + pub fn network_globals(&self) -> &NetworkGlobals { &self.network_beacon_processor.network_globals } @@ -272,8 +282,13 @@ impl SyncNetworkContext { sender_id: RangeRequestId, ) -> Result { let id = self.blocks_by_range_request(peer_id, batch_type, request)?; - self.range_blocks_and_blobs_requests - .insert(id, (sender_id, BlocksAndBlobsRequestInfo::new(batch_type))); + self.range_blocks_and_blobs_requests.insert( + id, + ( + sender_id, + BlocksAndBlobsRequestInfo::new(batch_type, peer_id), + ), + ); Ok(id) } @@ -375,7 +390,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blocks_by_root_requests - .insert(id, ActiveBlocksByRootRequest::new(request)); + .insert(id, ActiveBlocksByRootRequest::new(request, peer_id)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -453,7 +468,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blobs_by_root_requests - .insert(id, ActiveBlobsByRootRequest::new(request)); + .insert(id, ActiveBlobsByRootRequest::new(request, peer_id)); Ok(LookupRequestResult::RequestSent(req_id)) } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 6e4683701bd..8387e9b0e1a 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,5 +1,8 @@ use beacon_chain::get_block_root; -use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; +use lighthouse_network::{ + rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}, + PeerId, +}; use std::sync::Arc; use strum::IntoStaticStr; use types::{ @@ -20,13 +23,15 @@ pub enum LookupVerifyError { pub struct ActiveBlocksByRootRequest { request: BlocksByRootSingleRequest, resolved: bool, + pub(crate) peer_id: PeerId, } impl ActiveBlocksByRootRequest { - pub fn new(request: BlocksByRootSingleRequest) -> Self { + pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { Self { request, resolved: false, + peer_id, } } @@ -94,14 +99,16 @@ pub struct ActiveBlobsByRootRequest { request: BlobsByRootSingleBlockRequest, blobs: Vec>>, resolved: bool, + pub(crate) peer_id: PeerId, } impl ActiveBlobsByRootRequest { - pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { + pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { Self { request, blobs: vec![], resolved: false, + peer_id, } } From 70af7d1ed4f47d1da22ba3ad23d4c4ed94428752 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 21 Jun 2024 18:02:40 +0530 Subject: [PATCH 03/21] Allow lookups to continue in case of disconnections --- beacon_node/network/src/sync/block_lookups/mod.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a47c9f3840e..a0a8b69c31c 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -411,16 +411,9 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { - let should_drop_lookup = - req.should_drop_lookup_on_disconnected_peer(peer_id ); - - if should_drop_lookup { - debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); - } - - !should_drop_lookup - }); + for (_, lookup) in self.single_block_lookups.iter_mut() { + lookup.remove_peer(peer_id); + } } /* Processing responses */ From 312be2c492536ccfc238b87fd688208b561484f9 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 21 Jun 2024 21:43:33 +0530 Subject: [PATCH 04/21] Pretty response types --- .../network/src/sync/block_lookups/mod.rs | 6 +-- beacon_node/network/src/sync/manager.rs | 50 +++++++++++-------- .../network/src/sync/network_context.rs | 39 ++++++++------- 3 files changed, 54 insertions(+), 41 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a0a8b69c31c..93341783751 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -2,7 +2,7 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; -use super::network_context::{RpcResponseResult, SyncNetworkContext}; +use super::network_context::{RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; @@ -337,7 +337,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: RpcResponseResult, + response: Result<(R::VerifiedResponseType, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, peer_id, response, cx); @@ -349,7 +349,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: RpcResponseResult, + response: Result<(R::VerifiedResponseType, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Result { // Note: no need to downscore peers here, already downscored on network context diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 08460067726..74a520c66c9 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,9 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; +use super::network_context::{ + BlockOrBlob, RangeRequestId, RpcEvent, RpcResponseResult, SyncNetworkContext, +}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; @@ -860,21 +862,24 @@ impl SyncManager { peer_id: PeerId, block: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { - self.block_lookups + match self.network.on_single_block_response(id, peer_id, block) { + RpcResponseResult::Response(resp) => self + .block_lookups .on_download_response::>( id, peer_id, resp, &mut self.network, - ) - } else { - debug!( - self.log, - "RPC error for block lookup has no associated entry in network context, ungraceful disconnect"; - "peer_id" => %peer_id, - "request_id" => ?id, - ); + ), + RpcResponseResult::RequestNotFound => { + debug!( + self.log, + "RPC error for block lookup has no associated entry in network context, ungraceful disconnect"; + "peer_id" => %peer_id, + "request_id" => ?id, + ); + } + RpcResponseResult::NoOp | RpcResponseResult::StreamTermination => {} } } @@ -909,21 +914,24 @@ impl SyncManager { peer_id: PeerId, blob: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { - self.block_lookups + match self.network.on_single_blob_response(id, peer_id, blob) { + RpcResponseResult::Response(resp) => self + .block_lookups .on_download_response::>( id, peer_id, resp, &mut self.network, - ) - } else { - debug!( - self.log, - "RPC error for blob lookup has no associated entry in network context, ungraceful disconnect"; - "peer_id" => %peer_id, - "request_id" => ?id, - ); + ), + RpcResponseResult::RequestNotFound => { + debug!( + self.log, + "RPC error for blob lookup has no associated entry in network context, ungraceful disconnect"; + "peer_id" => %peer_id, + "request_id" => ?id, + ); + } + RpcResponseResult::NoOp | RpcResponseResult::StreamTermination => {} } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 6c8286f2818..67675a79700 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -52,7 +52,12 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; +pub enum RpcResponseResult { + Response(Result<(T, Duration), RpcResponseError>), + StreamTermination, + RequestNotFound, + NoOp, +} pub enum RpcResponseError { RpcError(RPCError), @@ -582,36 +587,36 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, block: RpcEvent>>, - ) -> Option>>> { + ) -> RpcResponseResult>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { - return None; + return RpcResponseResult::RequestNotFound; }; let resp = match block { RpcEvent::Response(block, seen_timestamp) => { match request.get_mut().add_response(block) { - Ok(block) => Ok((block, seen_timestamp)), + Ok(block) => RpcResponseResult::Response(Ok((block, seen_timestamp))), Err(e) => { // The request must be dropped after receiving an error. request.remove(); - Err(e.into()) + RpcResponseResult::Response(Err(e.into())) } } } RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - Err(e) => Err(e.into()), + Ok(_) => return RpcResponseResult::StreamTermination, + Err(e) => RpcResponseResult::Response(Err(e.into())), }, RpcEvent::RPCError(e) => { request.remove(); - Err(e.into()) + RpcResponseResult::Response(Err(e.into())) } }; - if let Err(RpcResponseError::VerifyError(e)) = &resp { + if let RpcResponseResult::Response(Err(RpcResponseError::VerifyError(e))) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - Some(resp) + resp } pub fn on_single_blob_response( @@ -619,9 +624,9 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, blob: RpcEvent>>, - ) -> Option>> { + ) -> RpcResponseResult> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { - return None; + return RpcResponseResult::RequestNotFound; }; let resp = match blob { @@ -631,12 +636,12 @@ impl SyncNetworkContext { Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) .map(|blobs| (blobs, seen_timestamp)) .map_err(|e| (e.into(), request.resolve())), - Ok(None) => return None, + Ok(None) => return RpcResponseResult::NoOp, Err(e) => Err((e.into(), request.resolve())), } } RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, + Ok(_) => return RpcResponseResult::StreamTermination, // (err, false = not resolved) because terminate returns Ok() if resolved Err(e) => Err((e.into(), false)), }, @@ -644,7 +649,7 @@ impl SyncNetworkContext { }; match resp { - Ok(resp) => Some(Ok(resp)), + Ok(resp) => RpcResponseResult::Response(Ok(resp)), // Track if this request has already returned some value downstream. Ensure that // downstream code only receives a single Result per request. If the serving peer does // multiple penalizable actions per request, downscore and return None. This allows to @@ -655,9 +660,9 @@ impl SyncNetworkContext { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } if resolved { - None + RpcResponseResult::NoOp } else { - Some(Err(e)) + RpcResponseResult::Response(Err(e)) } } } From a38caf472e02dbc6e0ee507a7e6d7d67e0577a73 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 21 Jun 2024 21:43:41 +0530 Subject: [PATCH 05/21] fmt --- .../network/src/sync/block_lookups/single_block_lookup.rs | 8 -------- beacon_node/network/src/sync/block_lookups/tests.rs | 2 ++ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 209dd07a1f4..13efd36ab7e 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -261,14 +261,6 @@ impl SingleBlockLookup { fn use_rand_available_peer(&mut self) -> Option { self.peers.iter().choose(&mut rand::thread_rng()).copied() } - /// Checks if the peer is disconnected. - /// - /// Returns true if the lookup should be dropped. - pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool { - self.remove_peer(peer_id); - - return self.has_no_peers(); - } } /// The state of the blob request component of a `SingleBlockLookup`. diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index fd381dd23c4..9790060fe25 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -560,6 +560,8 @@ impl TestRig { error: RPCError::Disconnected, }); } + } + fn peer_disconnected(&mut self, peer_id: PeerId) { self.send_sync_message(SyncMessage::Disconnect(peer_id)); } From c7fd21de4588d10d1ce974c9191a0bbe022844bd Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 24 Jun 2024 10:50:37 +0530 Subject: [PATCH 06/21] Fix lints --- beacon_node/network/src/sync/block_lookups/tests.rs | 4 ---- beacon_node/network/src/sync/block_sidecar_coupling.rs | 10 +++++++--- beacon_node/network/src/sync/manager.rs | 2 +- beacon_node/network/src/sync/network_context.rs | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 9790060fe25..52c83e7facf 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -539,10 +539,6 @@ impl TestRig { }) } - fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) { - self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id)); - } - /// Return RPCErrors for all active requests of peer fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) { self.drain_network_rx(); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 7427f70e69d..f31f2921ea2 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -18,7 +18,7 @@ pub struct BlocksAndBlobsRequestInfo { is_sidecars_stream_terminated: bool, /// Used to determine if this accumulator should wait for a sidecars stream termination request_type: ByRangeRequestType, - /// TODO(pawan): would this be multiple peer ids? + /// The peer the request was made to. pub(crate) peer_id: PeerId, } @@ -113,12 +113,14 @@ mod tests { use super::BlocksAndBlobsRequestInfo; use crate::sync::range_sync::ByRangeRequestType; use beacon_chain::test_utils::{generate_rand_block_and_blobs, NumBlobs}; + use lighthouse_network::PeerId; use rand::SeedableRng; use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; #[test] fn no_blobs_into_responses() { - let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::Blocks); + let peer_id = PeerId::random(); + let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::Blocks, peer_id); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng).0) @@ -137,7 +139,9 @@ mod tests { #[test] fn empty_blobs_into_responses() { - let mut info = BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::BlocksAndBlobs); + let peer_id = PeerId::random(); + let mut info = + BlocksAndBlobsRequestInfo::::new(ByRangeRequestType::BlocksAndBlobs, peer_id); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 74a520c66c9..7f799e99d0f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -402,7 +402,7 @@ impl SyncManager { // Note: the order is important here, we should only remove the requests // from network context's maps after we have initiated the retries for // the associated batches/lookups - self.network.peer_disconnected(peer_id.clone()); + self.network.peer_disconnected(peer_id); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 67675a79700..978f05b2f89 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -183,13 +183,13 @@ impl SyncNetworkContext { } /// Remove all active requests associated with the peer. - pub fn peer_disconnected(&mut self, peer_id: PeerId) { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) { self.blocks_by_root_requests - .retain(|_, request| request.peer_id != peer_id); + .retain(|_, request| request.peer_id != *peer_id); self.blobs_by_root_requests - .retain(|_, request| request.peer_id != peer_id); + .retain(|_, request| request.peer_id != *peer_id); self.range_blocks_and_blobs_requests - .retain(|_, request| request.1.peer_id != peer_id); + .retain(|_, request| request.1.peer_id != *peer_id); } pub fn network_globals(&self) -> &NetworkGlobals { From a8f64f224908e48d0b2ac37ed217252d06ef6165 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 24 Jun 2024 13:03:30 +0530 Subject: [PATCH 07/21] Remove lookup if it cannot progress --- .../network/src/sync/block_lookups/mod.rs | 7 ++++-- .../sync/block_lookups/single_block_lookup.rs | 22 +++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 93341783751..5e35cabdc90 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -411,9 +411,12 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId) { /* Check disconnection for single lookups */ - for (_, lookup) in self.single_block_lookups.iter_mut() { + self.single_block_lookups.retain(|_, lookup| { lookup.remove_peer(peer_id); - } + // Keep the lookup if it was some peers that can drive progress + // or if it has some downloaded components that can be processed + !lookup.has_no_peers() || lookup.can_progress_without_peer() + }) } /* Processing responses */ diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 13efd36ab7e..2b9826a7e58 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -246,10 +246,15 @@ impl SingleBlockLookup { self.peers.insert(peer_id) } - /// Remove peer from available peers. Return true if there are no more available peers and all - /// requests are not expecting any future event (AwaitingDownload). - pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { - self.peers.remove(peer_id) + /// Remove peer from available peers. + pub fn remove_peer(&mut self, peer_id: &PeerId) { + self.peers.remove(peer_id); + } + + /// Returns true if a lookup has some downloaded components that can be processed. + pub fn can_progress_without_peer(&self) -> bool { + self.block_request_state.state.can_progress_without_peer() + || self.blob_request_state.state.can_progress_without_peer() } /// Returns true if this lookup has zero peers @@ -349,6 +354,15 @@ impl SingleLookupRequestState { } } + pub fn can_progress_without_peer(&self) -> bool { + match self.state { + State::AwaitingDownload { .. } | State::Downloading { .. } => false, + State::AwaitingProcess { .. } | State::Processing { .. } | State::Processed { .. } => { + true + } + } + } + pub fn is_processed(&self) -> bool { match self.state { State::AwaitingDownload { .. } From bc10fb26b7e2095fe86fb3954b58f6bf7fdd530a Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 24 Jun 2024 15:25:56 +0530 Subject: [PATCH 08/21] Fix tests --- .../network/src/sync/block_lookups/tests.rs | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 52c83e7facf..6369e4eb05f 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1026,6 +1026,25 @@ fn test_single_block_lookup_failure() { rig.expect_empty_network(); } +#[test] +fn test_single_block_lookup_peer_disconnected_then_rpc_error() { + let mut rig = TestRig::test_setup(); + + let block_hash = Hash256::random(); + let peer_id = rig.new_connected_peer(); + + // Trigger the request + rig.trigger_unknown_block_from_attestation(block_hash, peer_id); + let id = rig.expect_block_lookup_request(block_hash); + + // The peer disconnect event reaches sync before the rpc error + rig.peer_disconnected(peer_id); + // The request fails + rig.single_lookup_failed(id, peer_id, RPCError::Disconnected); + // The request should be removed from the network context on disconnection + rig.expect_empty_network(); +} + #[test] fn test_single_block_lookup_becomes_parent_request() { let mut rig = TestRig::test_setup(); @@ -1289,19 +1308,10 @@ fn test_lookup_peer_disconnected_no_peers_left_while_request() { rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); rig.peer_disconnected(peer_id); rig.rpc_error_all_active_requests(peer_id); - rig.expect_no_active_lookups(); -} - -#[test] -fn test_lookup_peer_disconnected_no_peers_left_not_while_request() { - let mut rig = TestRig::test_setup(); - let peer_id = rig.new_connected_peer(); - let trigger_block = rig.rand_block(); - rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); - rig.peer_disconnected(peer_id); - // Note: this test case may be removed in the future. It's not strictly necessary to drop a - // lookup if there are no peers left. Lookup should only be dropped if it can not make progress - rig.expect_no_active_lookups(); + // Erroring all rpc requests and disconnecting the peer shouldn't remove the active + // request as we can still progress. + // The parent lookup trigger in this case can still be progressed so it shouldn't be removed. + rig.assert_single_lookups_count(1); } #[test] From cd17f9f1baee47e114047a1feb36400d5d97ef42 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 24 Jun 2024 15:39:34 +0530 Subject: [PATCH 09/21] Remove poll_close on rpc behaviour --- .../lighthouse_network/src/rpc/handler.rs | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index b7166efc376..6f338ebc8be 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -352,37 +352,6 @@ where !matches!(self.state, HandlerState::Deactivated) } - // NOTE: This function gets polled to completion upon a connection close. - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { - // Inform the network behaviour of any failed requests - - while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() { - let outbound_info = self - .outbound_substreams - .remove(&substream_id) - .expect("The value must exist for a key"); - // If the state of the connection is closing, we do not need to report this case to - // the behaviour, as the connection has just closed non-gracefully - if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) { - continue; - } - - // Register this request as an RPC Error - return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound { - error: RPCError::Disconnected, - proto: outbound_info.proto, - id: outbound_info.req_id, - }))); - } - - // Also handle any events that are awaiting to be sent to the behaviour - if !self.events_out.is_empty() { - return Poll::Ready(Some(self.events_out.remove(0))); - } - - Poll::Ready(None) - } - fn poll( &mut self, cx: &mut Context<'_>, From 6ead176ed0177bf7320203c69e2c73fceaaaef9d Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 24 Jun 2024 18:13:24 +0530 Subject: [PATCH 10/21] Remove redundant test --- .../lighthouse_network/tests/rpc_tests.rs | 94 +------------------ 1 file changed, 1 insertion(+), 93 deletions(-) diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 8d29f5158bc..527b853dc30 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -3,7 +3,7 @@ mod common; use common::Protocol; -use lighthouse_network::rpc::{methods::*, RPCError}; +use lighthouse_network::rpc::methods::*; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz::Encode; @@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { }) } -#[test] -fn test_disconnect_triggers_rpc_error() { - // set up the logging. The level and enabled logging or not - let log_level = Level::Debug; - let enable_logging = false; - - let log = common::build_log(log_level, enable_logging); - let spec = E::default_spec(); - - let rt = Arc::new(Runtime::new().unwrap()); - // get sender/receiver - rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair( - Arc::downgrade(&rt), - &log, - ForkName::Base, - &spec, - Protocol::Tcp, - ) - .await; - - // BlocksByRoot Request - let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new( - // Must have at least one root for the request to create a stream - vec![Hash256::from_low_u64_be(0)], - &spec, - )); - - // build the sender future - let sender_future = async { - loop { - match sender.next_event().await { - NetworkEvent::PeerConnectedOutgoing(peer_id) => { - // Send a STATUS message - debug!(log, "Sending RPC"); - sender - .send_request(peer_id, 42, rpc_request.clone()) - .unwrap(); - } - NetworkEvent::RPCFailed { error, id: 42, .. } => match error { - RPCError::Disconnected => return, - other => panic!("received unexpected error {:?}", other), - }, - other => { - warn!(log, "Ignoring other event {:?}", other); - } - } - } - }; - - // determine messages to send (PeerId, RequestId). If some, indicates we still need to send - // messages - let mut sending_peer = None; - let receiver_future = async { - loop { - // this future either drives the sending/receiving or times out allowing messages to be - // sent in the timeout - match futures::future::select( - Box::pin(receiver.next_event()), - Box::pin(tokio::time::sleep(Duration::from_secs(1))), - ) - .await - { - futures::future::Either::Left((ev, _)) => match ev { - NetworkEvent::RequestReceived { peer_id, .. } => { - sending_peer = Some(peer_id); - } - other => { - warn!(log, "Ignoring other event {:?}", other); - } - }, - futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required - } - - // if we need to send messages send them here. This will happen after a delay - if let Some(peer_id) = sending_peer.take() { - warn!(log, "Receiver got request, disconnecting peer"); - receiver.__hard_disconnect_testing_only(peer_id); - } - } - }; - - tokio::select! { - _ = sender_future => {} - _ = receiver_future => {} - _ = sleep(Duration::from_secs(30)) => { - panic!("Future timed out"); - } - } - }) -} - /// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC /// Goodbye message. fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { From 3e1c41a18a2212f5f6be191cc80e62fb1dd71e38 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 01:45:20 +0530 Subject: [PATCH 11/21] Fix issue raised by lion --- beacon_node/network/src/sync/manager.rs | 16 ++++--- .../network/src/sync/network_context.rs | 46 +++++++++++++++---- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7f799e99d0f..28fe43a5e88 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -393,16 +393,20 @@ impl SyncManager { /// there is no way to guarantee that libp2p always emits a error along with /// the disconnect. fn peer_disconnect(&mut self, peer_id: &PeerId) { + // Inject a Disconnected error on all requests associated with the disconnected peer + // to retry all batches/lookups + for request_id in self.network.peer_disconnected(peer_id) { + self.inject_error(*peer_id, request_id, RPCError::Disconnected); + } + + // Remove peer from all data structures self.range_sync.peer_disconnect(&mut self.network, peer_id); - self.block_lookups.peer_disconnected(peer_id); - // Regardless of the outcome, we update the sync status. let _ = self .backfill_sync .peer_disconnected(peer_id, &mut self.network); - // Note: the order is important here, we should only remove the requests - // from network context's maps after we have initiated the retries for - // the associated batches/lookups - self.network.peer_disconnected(peer_id); + self.block_lookups.peer_disconnected(peer_id); + + // Regardless of the outcome, we update the sync status. self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 978f05b2f89..ecc05c082ee 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -182,14 +182,44 @@ impl SyncNetworkContext { } } - /// Remove all active requests associated with the peer. - pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - self.blocks_by_root_requests - .retain(|_, request| request.peer_id != *peer_id); - self.blobs_by_root_requests - .retain(|_, request| request.peer_id != *peer_id); - self.range_blocks_and_blobs_requests - .retain(|_, request| request.1.peer_id != *peer_id); + /// Returns all ids that were requested to the given peer_id + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { + let failed_range_ids = + self.range_blocks_and_blobs_requests + .iter() + .filter_map(|(id, request)| { + if request.1.peer_id == *peer_id { + Some(SyncRequestId::RangeBlockAndBlobs { id: id.clone() }) + } else { + None + } + }); + + let failed_block_ids = self + .blocks_by_root_requests + .iter() + .filter_map(|(id, request)| { + if request.peer_id == *peer_id { + Some(SyncRequestId::SingleBlock { id: id.clone() }) + } else { + None + } + }); + let failed_blob_ids = self + .blobs_by_root_requests + .iter() + .filter_map(|(id, request)| { + if request.peer_id == *peer_id { + Some(SyncRequestId::SingleBlob { id: id.clone() }) + } else { + None + } + }); + + failed_range_ids + .chain(failed_block_ids) + .chain(failed_blob_ids) + .collect() } pub fn network_globals(&self) -> &NetworkGlobals { From dfaf2384951f4310f7f7c23eef0547482ff576c0 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 01:56:29 +0530 Subject: [PATCH 12/21] Revert pretty response types --- .../network/src/sync/block_lookups/mod.rs | 6 +-- beacon_node/network/src/sync/manager.rs | 36 ++++------------- .../network/src/sync/network_context.rs | 39 ++++++++----------- 3 files changed, 27 insertions(+), 54 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 5e35cabdc90..22e7e289a5d 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -2,7 +2,7 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; -use super::network_context::{RpcResponseError, SyncNetworkContext}; +use super::network_context::{RpcResponseResult, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; @@ -337,7 +337,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: Result<(R::VerifiedResponseType, Duration), RpcResponseError>, + response: RpcResponseResult, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, peer_id, response, cx); @@ -349,7 +349,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: Result<(R::VerifiedResponseType, Duration), RpcResponseError>, + response: RpcResponseResult, cx: &mut SyncNetworkContext, ) -> Result { // Note: no need to downscore peers here, already downscored on network context diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 28fe43a5e88..0f8cab18c92 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,9 +35,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{ - BlockOrBlob, RangeRequestId, RpcEvent, RpcResponseResult, SyncNetworkContext, -}; +use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; @@ -866,24 +864,14 @@ impl SyncManager { peer_id: PeerId, block: RpcEvent>>, ) { - match self.network.on_single_block_response(id, peer_id, block) { - RpcResponseResult::Response(resp) => self - .block_lookups + if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { + self.block_lookups .on_download_response::>( id, peer_id, resp, &mut self.network, - ), - RpcResponseResult::RequestNotFound => { - debug!( - self.log, - "RPC error for block lookup has no associated entry in network context, ungraceful disconnect"; - "peer_id" => %peer_id, - "request_id" => ?id, - ); - } - RpcResponseResult::NoOp | RpcResponseResult::StreamTermination => {} + ) } } @@ -918,24 +906,14 @@ impl SyncManager { peer_id: PeerId, blob: RpcEvent>>, ) { - match self.network.on_single_blob_response(id, peer_id, blob) { - RpcResponseResult::Response(resp) => self - .block_lookups + if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { + self.block_lookups .on_download_response::>( id, peer_id, resp, &mut self.network, - ), - RpcResponseResult::RequestNotFound => { - debug!( - self.log, - "RPC error for blob lookup has no associated entry in network context, ungraceful disconnect"; - "peer_id" => %peer_id, - "request_id" => ?id, - ); - } - RpcResponseResult::NoOp | RpcResponseResult::StreamTermination => {} + ) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ecc05c082ee..d00b620b558 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -52,12 +52,7 @@ pub enum RpcEvent { RPCError(RPCError), } -pub enum RpcResponseResult { - Response(Result<(T, Duration), RpcResponseError>), - StreamTermination, - RequestNotFound, - NoOp, -} +pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; pub enum RpcResponseError { RpcError(RPCError), @@ -617,36 +612,36 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, block: RpcEvent>>, - ) -> RpcResponseResult>> { + ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { - return RpcResponseResult::RequestNotFound; + return None; }; let resp = match block { RpcEvent::Response(block, seen_timestamp) => { match request.get_mut().add_response(block) { - Ok(block) => RpcResponseResult::Response(Ok((block, seen_timestamp))), + Ok(block) => Ok((block, seen_timestamp)), Err(e) => { // The request must be dropped after receiving an error. request.remove(); - RpcResponseResult::Response(Err(e.into())) + Err(e.into()) } } } RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return RpcResponseResult::StreamTermination, - Err(e) => RpcResponseResult::Response(Err(e.into())), + Ok(_) => return None, + Err(e) => Err(e.into()), }, RpcEvent::RPCError(e) => { request.remove(); - RpcResponseResult::Response(Err(e.into())) + Err(e.into()) } }; - if let RpcResponseResult::Response(Err(RpcResponseError::VerifyError(e))) = &resp { + if let Err(RpcResponseError::VerifyError(e)) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - resp + Some(resp) } pub fn on_single_blob_response( @@ -654,9 +649,9 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, blob: RpcEvent>>, - ) -> RpcResponseResult> { + ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { - return RpcResponseResult::RequestNotFound; + return None; }; let resp = match blob { @@ -666,12 +661,12 @@ impl SyncNetworkContext { Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) .map(|blobs| (blobs, seen_timestamp)) .map_err(|e| (e.into(), request.resolve())), - Ok(None) => return RpcResponseResult::NoOp, + Ok(None) => return None, Err(e) => Err((e.into(), request.resolve())), } } RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return RpcResponseResult::StreamTermination, + Ok(_) => return None, // (err, false = not resolved) because terminate returns Ok() if resolved Err(e) => Err((e.into(), false)), }, @@ -679,7 +674,7 @@ impl SyncNetworkContext { }; match resp { - Ok(resp) => RpcResponseResult::Response(Ok(resp)), + Ok(resp) => Some(Ok(resp)), // Track if this request has already returned some value downstream. Ensure that // downstream code only receives a single Result per request. If the serving peer does // multiple penalizable actions per request, downscore and return None. This allows to @@ -690,9 +685,9 @@ impl SyncNetworkContext { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } if resolved { - RpcResponseResult::NoOp + None } else { - RpcResponseResult::Response(Err(e)) + Some(Err(e)) } } } From b930c7c2a2082774c8240c928c444cfc8ea8291d Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 01:58:47 +0530 Subject: [PATCH 13/21] Cleanup --- beacon_node/network/src/sync/block_lookups_review.md | 0 beacon_node/network/src/sync/network_context.rs | 6 +++--- 2 files changed, 3 insertions(+), 3 deletions(-) delete mode 100644 beacon_node/network/src/sync/block_lookups_review.md diff --git a/beacon_node/network/src/sync/block_lookups_review.md b/beacon_node/network/src/sync/block_lookups_review.md deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d00b620b558..388028f15f8 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -184,7 +184,7 @@ impl SyncNetworkContext { .iter() .filter_map(|(id, request)| { if request.1.peer_id == *peer_id { - Some(SyncRequestId::RangeBlockAndBlobs { id: id.clone() }) + Some(SyncRequestId::RangeBlockAndBlobs { id: *id }) } else { None } @@ -195,7 +195,7 @@ impl SyncNetworkContext { .iter() .filter_map(|(id, request)| { if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlock { id: id.clone() }) + Some(SyncRequestId::SingleBlock { id: *id }) } else { None } @@ -205,7 +205,7 @@ impl SyncNetworkContext { .iter() .filter_map(|(id, request)| { if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlob { id: id.clone() }) + Some(SyncRequestId::SingleBlob { id: *id }) } else { None } From 62167fbccacaf16a99e7fa964f8eed95d7a86329 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 02:30:53 +0530 Subject: [PATCH 14/21] Fix test --- beacon_node/network/src/sync/block_lookups/tests.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 6369e4eb05f..a5e671478f4 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1039,8 +1039,11 @@ fn test_single_block_lookup_peer_disconnected_then_rpc_error() { // The peer disconnect event reaches sync before the rpc error rig.peer_disconnected(peer_id); + // Only peer and no progress on lookup, so lookup should be removed + rig.expect_no_active_lookups(); // The request fails rig.single_lookup_failed(id, peer_id, RPCError::Disconnected); + rig.expect_block_lookup_request(block_hash); // The request should be removed from the network context on disconnection rig.expect_empty_network(); } From 26614f10adfa65c6cadfa62af2e77dd2bbe9a6eb Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 02:42:27 -0700 Subject: [PATCH 15/21] Apply suggestions from joao MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Oliveira --- beacon_node/network/src/sync/backfill_sync/mod.rs | 4 ++-- beacon_node/network/src/sync/block_lookups/mod.rs | 4 ++-- beacon_node/network/src/sync/block_lookups/tests.rs | 10 +++++----- beacon_node/network/src/sync/network_context.rs | 2 +- beacon_node/network/src/sync/range_sync/chain.rs | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 29d209bbe38..5431e1bcdc1 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -320,7 +320,7 @@ impl BackFillSync { } if let Some(batch_ids) = self.active_requests.remove(peer_id) { - // fail the batches + // fail the batches. for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { match batch.download_failed(false) { @@ -335,7 +335,7 @@ impl BackFillSync { // If we have run out of peers in which to retry this batch, the backfill state // transitions to a paused state. // We still need to reset the state for all the affected batches, so we should not - // short circuit early + // short circuit early. if self.retry_batch_download(network, id).is_err() { debug!( self.log, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 22e7e289a5d..d707d566642 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -410,11 +410,11 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - /* Check disconnection for single lookups */ + // Check disconnection for single lookups. self.single_block_lookups.retain(|_, lookup| { lookup.remove_peer(peer_id); // Keep the lookup if it was some peers that can drive progress - // or if it has some downloaded components that can be processed + // or if it has some downloaded components that can be processed. !lookup.has_no_peers() || lookup.can_progress_without_peer() }) } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index a5e671478f4..0c3c69c133c 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1033,18 +1033,18 @@ fn test_single_block_lookup_peer_disconnected_then_rpc_error() { let block_hash = Hash256::random(); let peer_id = rig.new_connected_peer(); - // Trigger the request + // Trigger the request. rig.trigger_unknown_block_from_attestation(block_hash, peer_id); let id = rig.expect_block_lookup_request(block_hash); - // The peer disconnect event reaches sync before the rpc error + // The peer disconnect event reaches sync before the rpc error. rig.peer_disconnected(peer_id); - // Only peer and no progress on lookup, so lookup should be removed + // Only peer and no progress on lookup, so lookup should be removed. rig.expect_no_active_lookups(); - // The request fails + // The request fails. rig.single_lookup_failed(id, peer_id, RPCError::Disconnected); rig.expect_block_lookup_request(block_hash); - // The request should be removed from the network context on disconnection + // The request should be removed from the network context on disconnection. rig.expect_empty_network(); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 388028f15f8..d30cff00322 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -177,7 +177,7 @@ impl SyncNetworkContext { } } - /// Returns all ids that were requested to the given peer_id + /// Returns the ids of all the requests made to the given peer_id. pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { let failed_range_ids = self.range_blocks_and_blobs_requests diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a735b0f6adb..122e8287e61 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -180,7 +180,7 @@ impl SyncingChain { network: &mut SyncNetworkContext, ) -> ProcessingResult { if let Some(batch_ids) = self.peers.remove(peer_id) { - // fail the batches + // fail the batches. for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { if let BatchOperationOutcome::Failed { blacklist } = From ec90bf085cc16affc3bad4b7d627d9151908078f Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 15:13:40 +0530 Subject: [PATCH 16/21] Fix log --- beacon_node/network/src/sync/block_lookups/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index d707d566642..69ffd461b89 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -779,12 +779,12 @@ impl BlockLookups { }; if stuck_lookup.id == ancestor_stuck_lookup.id { - warn!(self.log, "Notify the devs, a sync lookup is stuck"; + warn!(self.log, "Notify the devs a sync lookup is stuck"; "block_root" => ?stuck_lookup.block_root(), "lookup" => ?stuck_lookup, ); } else { - warn!(self.log, "Notify the devs, a sync lookup is stuck"; + warn!(self.log, "Notify the devs a sync lookup is stuck"; "block_root" => ?stuck_lookup.block_root(), "lookup" => ?stuck_lookup, "ancestor_block_root" => ?ancestor_stuck_lookup.block_root(), From e79c71ae8b019762460cbd6599416b5fad41df18 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 15:31:39 +0530 Subject: [PATCH 17/21] update request status on no peers found --- .../network/src/sync/block_lookups/single_block_lookup.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 2b9826a7e58..2af4d8cfb9d 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -199,6 +199,9 @@ impl SingleBlockLookup { let Some(peer_id) = self.use_rand_available_peer() else { // Allow lookup to not have any peers. In that case do nothing. If the lookup does // not have peers for some time, it will be dropped. + R::request_state_mut(self) + .get_state_mut() + .update_awaiting_download_status("no peers"); return Ok(()); }; From caee7c8a853eacdf334fde38bb73d34cb2d92584 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 25 Jun 2024 19:22:52 +0530 Subject: [PATCH 18/21] Do not remove lookup after peer disconnection --- beacon_node/network/src/sync/block_lookups/mod.rs | 8 ++------ .../src/sync/block_lookups/single_block_lookup.rs | 15 --------------- .../network/src/sync/block_lookups/tests.rs | 13 +++++++------ 3 files changed, 9 insertions(+), 27 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 69ffd461b89..0a0da5d1bb9 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -410,13 +410,9 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - // Check disconnection for single lookups. - self.single_block_lookups.retain(|_, lookup| { + for (_, lookup) in self.single_block_lookups.iter_mut() { lookup.remove_peer(peer_id); - // Keep the lookup if it was some peers that can drive progress - // or if it has some downloaded components that can be processed. - !lookup.has_no_peers() || lookup.can_progress_without_peer() - }) + } } /* Processing responses */ diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 2af4d8cfb9d..c1ca9daf14b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -254,12 +254,6 @@ impl SingleBlockLookup { self.peers.remove(peer_id); } - /// Returns true if a lookup has some downloaded components that can be processed. - pub fn can_progress_without_peer(&self) -> bool { - self.block_request_state.state.can_progress_without_peer() - || self.blob_request_state.state.can_progress_without_peer() - } - /// Returns true if this lookup has zero peers pub fn has_no_peers(&self) -> bool { self.peers.is_empty() @@ -357,15 +351,6 @@ impl SingleLookupRequestState { } } - pub fn can_progress_without_peer(&self) -> bool { - match self.state { - State::AwaitingDownload { .. } | State::Downloading { .. } => false, - State::AwaitingProcess { .. } | State::Processing { .. } | State::Processed { .. } => { - true - } - } - } - pub fn is_processed(&self) -> bool { match self.state { State::AwaitingDownload { .. } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 0c3c69c133c..02b07fa43e1 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -290,6 +290,7 @@ impl TestRig { .0 } + #[track_caller] fn expect_no_active_single_lookups(&self) { assert!( self.active_single_lookups().is_empty(), @@ -298,6 +299,7 @@ impl TestRig { ); } + #[track_caller] fn expect_no_active_lookups(&self) { self.expect_no_active_single_lookups(); } @@ -1039,8 +1041,8 @@ fn test_single_block_lookup_peer_disconnected_then_rpc_error() { // The peer disconnect event reaches sync before the rpc error. rig.peer_disconnected(peer_id); - // Only peer and no progress on lookup, so lookup should be removed. - rig.expect_no_active_lookups(); + // The lookup is not removed as it can still potentially make progress. + rig.assert_single_lookups_count(1); // The request fails. rig.single_lookup_failed(id, peer_id, RPCError::Disconnected); rig.expect_block_lookup_request(block_hash); @@ -1311,10 +1313,9 @@ fn test_lookup_peer_disconnected_no_peers_left_while_request() { rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); rig.peer_disconnected(peer_id); rig.rpc_error_all_active_requests(peer_id); - // Erroring all rpc requests and disconnecting the peer shouldn't remove the active - // request as we can still progress. - // The parent lookup trigger in this case can still be progressed so it shouldn't be removed. - rig.assert_single_lookups_count(1); + // Erroring all rpc requests and disconnecting the peer shouldn't remove the requests + // from the lookups map as they can still progress. + rig.assert_single_lookups_count(2); } #[test] From 804f36dc3bd26e8aba6b046e0428bf52b721ac59 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 25 Jun 2024 16:08:28 +0200 Subject: [PATCH 19/21] Add comments about expected event api --- .../network/src/sync/block_lookups/mod.rs | 16 +++++++++++++ .../sync/block_lookups/single_block_lookup.rs | 20 ++++++++++++++++ .../network/src/sync/network_context.rs | 23 ++++++++++++++++++- 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 22e7e289a5d..d4278628084 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,3 +1,19 @@ +//! Implements block lookup sync. +//! +//! Block lookup sync is triggered when a peer claims to have imported a block we don't know about. +//! For example, a peer attesting to a head block root that is not in our fork-choice. Lookup sync +//! is recursive in nature, as we may discover that this attested head block root has a parent that +//! is also unknown to us. +//! +//! Block lookup is implemented as an event driven state machine. It sends events to the network and +//! beacon processor, and expects some set of events back. A discrepancy in the expected event API +//! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event +//! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups +//! that live for too long, logging the line "Notify the devs a sync lookup is stuck". +//! +//! The expected event API is documented in the code paths that are making assumptions with the +//! comment prefix "Lookup sync event safety:" + use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 2b9826a7e58..5a8884cfae8 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -205,13 +205,21 @@ impl SingleBlockLookup { let request = R::request_state_mut(self); match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { LookupRequestResult::RequestSent(req_id) => { + // Lookup sync event safety: If make_request returns `RequestSent`, we are + // guaranteed that `BlockLookups::on_download_response` will be called exactly + // with this `req_id`. request.get_state_mut().on_download_start(req_id)? } LookupRequestResult::NoRequestNeeded => { + // Lookup sync event safety: Advances this request to the terminal `Processed` + // state. If all requests reach this state, the request is marked as completed + // in `Self::continue_requests`. request.get_state_mut().on_completed_request()? } // Sync will receive a future event to make progress on the request, do nothing now LookupRequestResult::Pending(reason) => { + // Lookup sync event safety: Refer to the code paths constructing + // `LookupRequestResult::Pending` request .get_state_mut() .update_awaiting_download_status(reason); @@ -222,16 +230,28 @@ impl SingleBlockLookup { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. + // + // TODO: The condition `block_is_processed || Block` can be dropped after checking for + // unknown parent root when import RPC blobs } else if !awaiting_parent && (block_is_processed || matches!(R::response_type(), ResponseType::Block)) { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { + // Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed + // that `BlockLookups::on_processing_result` will be called exactly once with this + // lookup_id return R::send_for_processing(id, result, cx); } + // Lookup sync event safety: If the request is not in `AwaitingDownload` or + // `AwaitingProcessing` state it is guaranteed to receive some event to make progress. } + // Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either: + // (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent + // lookup completes, or (2) get dropped if the parent fails and is dropped. + Ok(()) } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 388028f15f8..53f7c92f938 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -388,7 +388,10 @@ impl SyncNetworkContext { // Block is known are currently processing, expect a future event with the result of // processing. BlockProcessStatus::NotValidated { .. } => { - return Ok(LookupRequestResult::Pending("block in processing cache")) + // Lookup sync event safety: If the block is currently in the processing cache, we + // are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will + // make progress on this lookup + return Ok(LookupRequestResult::Pending("block in processing cache")); } // Block is fully validated. If it's not yet imported it's waiting for missing block // components. Consider this request completed and do nothing. @@ -411,6 +414,12 @@ impl SyncNetworkContext { let request = BlocksByRootSingleRequest(block_root); + // Lookup sync event safety: If network_send.send() returns Ok(_) we are guaranteed that + // eventually at least one this 3 events will be received: + // - StreamTermination(request_id): handled by `Self::on_single_block_response` + // - RPCError(request_id): handled by `Self::on_single_block_response` + // - Disconnect(peer_id) handled by `Self::peer_disconnected``which converts it to a + // ` RPCError(request_id)`event handled by the above method self.network_send .send(NetworkMessage::SendRequest { peer_id, @@ -453,6 +462,13 @@ impl SyncNetworkContext { // latter handle the case where if the peer sent no blobs, penalize. // - if `downloaded_block_expected_blobs` is Some = block is downloading or processing. // - if `num_expected_blobs` returns Some = block is processed. + // + // Lookup sync event safety: Reaching this code means that a block is not in any pre-import + // cache nor in the request state of this lookup. Therefore, the block must either: (1) not + // be downloaded yet or (2) the block is already imported into the fork-choice. + // In case (1) the lookup must either successfully download the block or get dropped. + // In case (2) the block will be downloaded, processed, reach `BlockIsAlreadyKnown` and + // get dropped as completed. return Ok(LookupRequestResult::Pending("waiting for block download")); }; @@ -489,6 +505,7 @@ impl SyncNetworkContext { indices, }; + // Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call self.network_send .send(NetworkMessage::SendRequest { peer_id, @@ -705,6 +722,8 @@ impl SyncNetworkContext { .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); + // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync + // must receive a single `SyncMessage::BlockComponentProcessed` with this process type beacon_processor .send_rpc_beacon_block( block_root, @@ -734,6 +753,8 @@ impl SyncNetworkContext { .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); + // Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync + // must receive a single `SyncMessage::BlockComponentProcessed` event with this process type beacon_processor .send_rpc_blobs( block_root, From 9b2e9e0fef6b28041bd06135da3f7ddd07d85fe5 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 25 Jun 2024 16:17:05 +0200 Subject: [PATCH 20/21] Update single_block_lookup.rs --- .../network/src/sync/block_lookups/single_block_lookup.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 5a8884cfae8..6747023e11a 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -197,8 +197,12 @@ impl SingleBlockLookup { } let Some(peer_id) = self.use_rand_available_peer() else { - // Allow lookup to not have any peers. In that case do nothing. If the lookup does - // not have peers for some time, it will be dropped. + // Allow lookup to not have any peers and do nothing. This is an optimization to not + // lose progress of lookups created from a block with unknown parent before we receive + // attestations for said block. + // Lookup sync event safety: If a lookup requires peers to make progress, and does + // not receive any new peers for some time it will be dropped. If it receives a new + // peer it must attempt to make progress. return Ok(()); }; From cd6855095250c22dd895313495dd3715c7ee12e2 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 25 Jun 2024 16:25:45 +0200 Subject: [PATCH 21/21] Update mod.rs --- beacon_node/network/src/sync/block_lookups/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index d4278628084..3d960393541 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -5,7 +5,7 @@ //! is recursive in nature, as we may discover that this attested head block root has a parent that //! is also unknown to us. //! -//! Block lookup is implemented as an event driven state machine. It sends events to the network and +//! Block lookup is implemented as an event-driven state machine. It sends events to the network and //! beacon processor, and expects some set of events back. A discrepancy in the expected event API //! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event //! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups @@ -13,6 +13,12 @@ //! //! The expected event API is documented in the code paths that are making assumptions with the //! comment prefix "Lookup sync event safety:" +//! +//! Block lookup sync attempts to not re-download or re-process data that we already have. Block +//! components are cached temporarily in multiple places before they are imported into fork-choice. +//! Therefore, block lookup sync must peek these caches correctly to decide when to skip a download +//! or consider a lookup complete. These caches are read from the `SyncNetworkContext` and its state +//! returned to this module as `LookupRequestResult` variants. use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult;