diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 3822b36b3d9..9f76390cefb 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -273,7 +273,7 @@ pub async fn publish_block( vec![], false, &log, + chain.spec.clone(), )); // Only a peer manager can add peers, so we create a dummy manager. diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 300c190cdaf..7b297d243bd 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -1232,6 +1232,7 @@ mod tests { vec![], false, &log, + spec.clone(), ); let keypair = keypair.into(); Discovery::new(keypair, &config, Arc::new(globals), &log, &spec) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 3e0ead01ce5..8d6ec78e437 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1383,7 +1383,8 @@ mod tests { ..Default::default() }; let log = build_log(slog::Level::Debug, false); - let globals = NetworkGlobals::new_test_globals(vec![], &log); + let spec = E::default_spec(); + let globals = NetworkGlobals::new_test_globals(vec![], &log, spec); PeerManager::new(config, Arc::new(globals), &log).unwrap() } @@ -1397,7 +1398,8 @@ mod tests { ..Default::default() }; let log = build_log(slog::Level::Debug, false); - let globals = NetworkGlobals::new_test_globals(trusted_peers, &log); + let spec = E::default_spec(); + let globals = NetworkGlobals::new_test_globals(trusted_peers, &log, spec); PeerManager::new(config, Arc::new(globals), &log).unwrap() } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index d0c6710e8a7..fdde57b4a57 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,6 +1,8 @@ use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; -use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; +use crate::{ + metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Eth2Enr, Gossipsub, PeerId, +}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; @@ -13,7 +15,7 @@ use std::{ fmt::Formatter, }; use sync_status::SyncStatus; -use types::{ChainSpec, EthSpec}; +use types::{ChainSpec, DataColumnSubnetId, EthSpec}; pub mod client; pub mod peer_info; @@ -45,10 +47,16 @@ pub struct PeerDB { disable_peer_scoring: bool, /// PeerDB's logger log: slog::Logger, + spec: ChainSpec, } impl PeerDB { - pub fn new(trusted_peers: Vec, disable_peer_scoring: bool, log: &slog::Logger) -> Self { + pub fn new( + trusted_peers: Vec, + disable_peer_scoring: bool, + log: &slog::Logger, + spec: ChainSpec, + ) -> Self { // Initialize the peers hashmap with trusted peers let peers = trusted_peers .into_iter() @@ -60,6 +68,7 @@ impl PeerDB { banned_peers_count: BannedPeersCount::default(), disable_peer_scoring, peers, + spec, } } @@ -247,6 +256,27 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + pub fn good_custody_subnet_peer( + &self, + subnet: DataColumnSubnetId, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(_, info)| { + // TODO(das): we currently consider peer to be a subnet peer if the peer is *either* + // subscribed to the subnet or assigned to the subnet. + // The first condition is currently required as we don't have custody count in + // metadata implemented yet, and therefore unable to reliably determine custody + // subnet count (ENR is not always available). + // This condition can be removed later so that we can identify peers that are not + // serving custody columns and penalise accordingly. + let is_custody_subnet_peer = info.on_subnet_gossipsub(&Subnet::DataColumn(subnet)) + || info.is_assigned_to_custody_subnet(&subnet); + info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives the ids of all known disconnected peers. pub fn disconnected_peers(&self) -> impl Iterator { self.peers @@ -676,12 +706,12 @@ impl PeerDB { /// Updates the connection state. MUST ONLY BE USED IN TESTS. pub fn __add_connected_peer_testing_only( &mut self, - peer_id: &PeerId, supernode: bool, spec: &ChainSpec, - ) -> Option { + ) -> PeerId { let enr_key = CombinedKey::generate_secp256k1(); let mut enr = Enr::builder().build(&enr_key).unwrap(); + let peer_id = enr.peer_id(); if supernode { enr.insert( @@ -693,13 +723,15 @@ impl PeerDB { } self.update_connection_state( - peer_id, + &peer_id, NewConnectionState::Connected { enr: Some(enr), seen_address: Multiaddr::empty(), direction: ConnectionDirection::Outgoing, }, - ) + ); + + peer_id } /// The connection state of the peer has been changed. Modify the peer in the db to ensure all @@ -762,8 +794,17 @@ impl PeerDB { seen_address, }, ) => { - // Update the ENR if one exists + // Update the ENR if one exists, and compute the custody subnets if let Some(enr) = enr { + let node_id = enr.node_id().raw().into(); + let custody_subnet_count = enr.custody_subnet_count::(&self.spec); + let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( + node_id, + custody_subnet_count, + &self.spec, + ) + .collect::>(); + info.set_custody_subnets(custody_subnets); info.set_enr(enr); } @@ -1314,7 +1355,8 @@ mod tests { fn get_db() -> PeerDB { let log = build_log(slog::Level::Debug, false); - PeerDB::new(vec![], false, &log) + let spec = M::default_spec(); + PeerDB::new(vec![], false, &log, spec) } #[test] @@ -2013,7 +2055,8 @@ mod tests { fn test_trusted_peers_score() { let trusted_peer = PeerId::random(); let log = build_log(slog::Level::Debug, false); - let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer], false, &log); + let spec = M::default_spec(); + let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer], false, &log, spec); pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); @@ -2037,7 +2080,8 @@ mod tests { fn test_disable_peer_scoring() { let peer = PeerId::random(); let log = build_log(slog::Level::Debug, false); - let mut pdb: PeerDB = PeerDB::new(vec![], true, &log); + let spec = M::default_spec(); + let mut pdb: PeerDB = PeerDB::new(vec![], true, &log, spec); pdb.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 0745cc26008..8a04d450ba4 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -13,7 +13,7 @@ use std::collections::HashSet; use std::net::IpAddr; use std::time::Instant; use strum::AsRefStr; -use types::EthSpec; +use types::{DataColumnSubnetId, EthSpec}; use PeerConnectionStatus::*; /// Information about a given connected peer. @@ -40,6 +40,11 @@ pub struct PeerInfo { meta_data: Option>, /// Subnets the peer is connected to. subnets: HashSet, + /// This is computed from either metadata or the ENR, and contains the subnets that the peer + /// is *assigned* to custody, rather than *connected* to (different to `self.subnets`). + /// Note: Another reason to keep this separate to `self.subnets` is an upcoming change to + /// decouple custody requirements from the actual subnets, i.e. changing this to `custody_groups`. + custody_subnets: HashSet, /// The time we would like to retain this peer. After this time, the peer is no longer /// necessary. #[serde(skip)] @@ -62,6 +67,7 @@ impl Default for PeerInfo { listening_addresses: Vec::new(), seen_multiaddrs: HashSet::new(), subnets: HashSet::new(), + custody_subnets: HashSet::new(), sync_status: SyncStatus::Unknown, meta_data: None, min_ttl: None, @@ -210,6 +216,11 @@ impl PeerInfo { self.subnets.contains(subnet) } + /// Returns if the peer is assigned to a given `DataColumnSubnetId`. + pub fn is_assigned_to_custody_subnet(&self, subnet: &DataColumnSubnetId) -> bool { + self.custody_subnets.contains(subnet) + } + /// Returns true if the peer is connected to a long-lived subnet. pub fn has_long_lived_subnet(&self) -> bool { // Check the meta_data @@ -362,6 +373,10 @@ impl PeerInfo { self.connection_status = connection_status } + pub(super) fn set_custody_subnets(&mut self, custody_subnets: HashSet) { + self.custody_subnets = custody_subnets + } + /// Sets the ENR of the peer if one is known. pub(super) fn set_enr(&mut self, enr: Enr) { self.enr = Some(enr) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index cec9f0841cc..d7af32420a7 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -172,6 +172,7 @@ impl Network { trusted_peers, config.disable_peer_scoring, &log, + ctx.chain_spec.clone(), ); Arc::new(globals) }; diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 0f99bfc5dae..12c114e8d58 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -2,9 +2,9 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; +use crate::EnrExt; use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; -use crate::{EnrExt, Subnet}; use parking_lot::RwLock; use std::collections::HashSet; use types::data_column_sidecar::ColumnIndex; @@ -27,6 +27,7 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, + spec: ChainSpec, } impl NetworkGlobals { @@ -36,16 +37,23 @@ impl NetworkGlobals { trusted_peers: Vec, disable_peer_scoring: bool, log: &slog::Logger, + spec: ChainSpec, ) -> Self { NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), listen_multiaddrs: RwLock::new(Vec::new()), local_metadata: RwLock::new(local_metadata), - peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)), + peers: RwLock::new(PeerDB::new( + trusted_peers, + disable_peer_scoring, + log, + spec.clone(), + )), gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::NotRequired), + spec, } } @@ -112,44 +120,45 @@ impl NetworkGlobals { } /// Compute custody data columns the node is assigned to custody. - pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec { + pub fn custody_columns(&self, _epoch: Epoch) -> Vec { let enr = self.local_enr(); let node_id = enr.node_id().raw().into(); - let custody_subnet_count = enr.custody_subnet_count::(spec); - DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count, spec) + let custody_subnet_count = enr.custody_subnet_count::(&self.spec); + DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count, &self.spec) .collect() } /// Compute custody data column subnets the node is assigned to custody. - pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator { + pub fn custody_subnets(&self) -> impl Iterator { let enr = self.local_enr(); let node_id = enr.node_id().raw().into(); - let custody_subnet_count = enr.custody_subnet_count::(spec); - DataColumnSubnetId::compute_custody_subnets::(node_id, custody_subnet_count, spec) + let custody_subnet_count = enr.custody_subnet_count::(&self.spec); + DataColumnSubnetId::compute_custody_subnets::(node_id, custody_subnet_count, &self.spec) } /// Returns a connected peer that: /// 1. is connected - /// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP) + /// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata (WIP) /// 3. has a good score /// 4. subscribed to the specified column - this condition can be removed later, so we can /// identify and penalise peers that are supposed to custody the column. - pub fn custody_peers_for_column( - &self, - column_index: ColumnIndex, - spec: &ChainSpec, - ) -> Vec { + pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec { self.peers .read() - .good_peers_on_subnet(Subnet::DataColumn( - DataColumnSubnetId::from_column_index::(column_index as usize, spec), + .good_custody_subnet_peer(DataColumnSubnetId::from_column_index::( + column_index as usize, + &self.spec, )) .cloned() .collect::>() } /// TESTING ONLY. Build a dummy NetworkGlobals instance. - pub fn new_test_globals(trusted_peers: Vec, log: &slog::Logger) -> NetworkGlobals { + pub fn new_test_globals( + trusted_peers: Vec, + log: &slog::Logger, + spec: ChainSpec, + ) -> NetworkGlobals { use crate::CombinedKeyExt; let keypair = libp2p::identity::secp256k1::Keypair::generate(); let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair); @@ -164,6 +173,7 @@ impl NetworkGlobals { trusted_peers, false, log, + spec, ) } } @@ -180,9 +190,9 @@ mod test { let default_custody_requirement_column_count = spec.number_of_columns as u64 / spec.data_column_sidecar_subnet_count * spec.custody_requirement; - let globals = NetworkGlobals::::new_test_globals(vec![], &log); + let globals = NetworkGlobals::::new_test_globals(vec![], &log, spec.clone()); let any_epoch = Epoch::new(0); - let columns = globals.custody_columns(any_epoch, &spec); + let columns = globals.custody_columns(any_epoch); assert_eq!( columns.len(), default_custody_requirement_column_count as usize diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index a9b9f64a79d..40c69a0baa5 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -93,7 +93,7 @@ impl TestRig { spec.shard_committee_period = 2; let harness = BeaconChainHarness::builder(MainnetEthSpec) - .spec(spec) + .spec(spec.clone()) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() @@ -204,7 +204,14 @@ impl TestRig { }); let enr_key = CombinedKey::generate_secp256k1(); let enr = enr::Enr::builder().build(&enr_key).unwrap(); - let network_globals = Arc::new(NetworkGlobals::new(enr, meta_data, vec![], false, &log)); + let network_globals = Arc::new(NetworkGlobals::new( + enr, + meta_data, + vec![], + false, + &log, + spec, + )); let executor = harness.runtime.task_executor.clone(); diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 6f4eb454d16..cd363cfaee3 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -132,7 +132,11 @@ impl TestRig { let (network_tx, network_rx) = mpsc::unbounded_channel(); // TODO(das): make the generation of the ENR use the deterministic rng to have consistent // column assignments - let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); + let globals = Arc::new(NetworkGlobals::new_test_globals( + Vec::new(), + &log, + chain.spec.clone(), + )); let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals, chain.clone(), @@ -385,21 +389,17 @@ impl TestRig { } fn new_connected_peer(&mut self) -> PeerId { - let peer_id = PeerId::random(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(&peer_id, false, &self.harness.spec); - peer_id + .__add_connected_peer_testing_only(false, &self.harness.spec) } fn new_connected_supernode_peer(&mut self) -> PeerId { - let peer_id = PeerId::random(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(&peer_id, true, &self.harness.spec); - peer_id + .__add_connected_peer_testing_only(true, &self.harness.spec) } fn new_connected_peers_for_peerdas(&mut self) { @@ -972,8 +972,8 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected blob parent request for {for_block:?}: {e}")) } - /// Retrieves an unknown number of requests for data columns of `block_root`. Because peer enrs - /// are random, and peer selection is random the total number of batches requests in unknown. + /// Retrieves an unknown number of requests for data columns of `block_root`. Because peer ENRs + /// are random, and peer selection is random, the total number of batched requests is unknown. fn expect_data_columns_by_root_requests( &mut self, block_root: Hash256, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5963401faa8..f35ee145b19 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -276,7 +276,7 @@ impl SyncNetworkContext { // TODO(das): epoch argument left here in case custody rotation is implemented pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec { self.network_globals() - .custody_peers_for_column(column_index, &self.chain.spec) + .custody_peers_for_column(column_index) } pub fn get_random_custodial_peer( @@ -382,9 +382,7 @@ impl SyncNetworkContext { let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let custody_indexes = self - .network_globals() - .custody_columns(epoch, &self.chain.spec); + let custody_indexes = self.network_globals().custody_columns(epoch); for (peer_id, columns_by_range_request) in self.make_columns_by_range_requests(epoch, request, &custody_indexes)? @@ -755,9 +753,7 @@ impl SyncNetworkContext { // TODO(das): figure out how to pass block.slot if we end up doing rotation let block_epoch = Epoch::new(0); - let custody_indexes_duty = self - .network_globals() - .custody_columns(block_epoch, &self.chain.spec); + let custody_indexes_duty = self.network_globals().custody_columns(block_epoch); // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = custody_indexes_duty diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 57d269c104c..aece5737cc9 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -10,7 +10,7 @@ use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_metrics::set_int_gauge; use lighthouse_network::service::api_types::Id; -use lighthouse_network::{PeerAction, PeerId, Subnet}; +use lighthouse_network::{PeerAction, PeerId}; use rand::seq::SliceRandom; use rand::Rng; use slog::{crit, debug, o, warn}; @@ -1115,24 +1115,25 @@ impl SyncingChain { fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all custody column subnets before sending batches - let peers_on_all_custody_subnets = network - .network_globals() - .custody_subnets(&network.chain.spec) - .all(|subnet_id| { - let peer_count = network - .network_globals() - .peers - .read() - .good_peers_on_subnet(Subnet::DataColumn(subnet_id)) - .count(); - - set_int_gauge( - &PEERS_PER_COLUMN_SUBNET, - &[&subnet_id.to_string()], - peer_count as i64, - ); - peer_count > 0 - }); + let peers_on_all_custody_subnets = + network + .network_globals() + .custody_subnets() + .all(|subnet_id| { + let peer_count = network + .network_globals() + .peers + .read() + .good_custody_subnet_peer(subnet_id) + .count(); + + set_int_gauge( + &PEERS_PER_COLUMN_SUBNET, + &[&subnet_id.to_string()], + peer_count as i64, + ); + peer_count > 0 + }); peers_on_all_custody_subnets } else { true diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 334c58090e2..c8bb9b3b09a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -689,7 +689,11 @@ mod tests { log.new(o!("component" => "range")), ); let (network_tx, network_rx) = mpsc::unbounded_channel(); - let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); + let globals = Arc::new(NetworkGlobals::new_test_globals( + Vec::new(), + &log, + chain.spec.clone(), + )); let (network_beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals.clone(),