From 480fbff0b52a2c5043a333ad6d3d571ecc3cb127 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Mon, 13 Jun 2022 16:54:06 +0200 Subject: [PATCH 1/4] store: take `&[u8]` rather than vector in `gc_col` (#7019) `gc_col` does not take advantage of key argument being a vector. Passing that argument as a slice is perfectly viable and avoids bunch of unnecessary memory allocations. Issue: https://github.com/near/nearcore/issues/6583 --- chain/chain/src/store.rs | 65 ++++++++++++++++---------------- core/primitives-core/src/hash.rs | 4 ++ core/primitives/src/sharding.rs | 6 +++ 3 files changed, 42 insertions(+), 33 deletions(-) diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 1b564764f4a..02a8d5ea7dc 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -1962,30 +1962,30 @@ impl<'a> ChainStoreUpdate<'a> { let chunk = self.get_chunk(&chunk_hash)?.clone(); debug_assert_eq!(chunk.cloned_header().height_created(), height); for transaction in chunk.transactions() { - self.gc_col(DBCol::Transactions, &transaction.get_hash().into()); + self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); } for receipt in chunk.receipts() { - self.gc_col(DBCol::Receipts, &receipt.get_hash().into()); + self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes()); } // 2. Delete chunk_hash-indexed data - let chunk_header_hash = chunk_hash.clone().into(); - self.gc_col(DBCol::Chunks, &chunk_header_hash); - self.gc_col(DBCol::PartialChunks, &chunk_header_hash); - self.gc_col(DBCol::InvalidChunks, &chunk_header_hash); + let chunk_hash = chunk_hash.as_bytes(); + self.gc_col(DBCol::Chunks, chunk_hash); + self.gc_col(DBCol::PartialChunks, chunk_hash); + self.gc_col(DBCol::InvalidChunks, chunk_hash); } let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?; for _header_hash in header_hashes { // 3. Delete header_hash-indexed data // TODO #3488: enable - //self.gc_col(DBCol::BlockHeader, &header_hash.into()); + //self.gc_col(DBCol::BlockHeader, header_hash.as_bytes()); } // 4. Delete chunks_tail-related data - let key = &index_to_bytes(height).to_vec(); - self.gc_col(DBCol::ChunkHashesByHeight, key); - self.gc_col(DBCol::HeaderHashesByHeight, key); + let key = index_to_bytes(height); + self.gc_col(DBCol::ChunkHashesByHeight, &key); + self.gc_col(DBCol::HeaderHashesByHeight, &key); } self.update_chunk_tail(min_chunk_height); Ok(()) @@ -2019,12 +2019,12 @@ impl<'a> ChainStoreUpdate<'a> { if !chunk_hashes.is_empty() { remaining -= 1; for chunk_hash in chunk_hashes { - let chunk_header_hash = chunk_hash.into(); - self.gc_col(DBCol::PartialChunks, &chunk_header_hash); + let chunk_hash = chunk_hash.as_bytes(); + self.gc_col(DBCol::PartialChunks, chunk_hash); // Data in DBCol::InvalidChunks isn’t technically redundant (it // cannot be calculated from other data) but it is data we // don’t need for anything so it can be deleted as well. - self.gc_col(DBCol::InvalidChunks, &chunk_header_hash); + self.gc_col(DBCol::InvalidChunks, chunk_hash); } } } @@ -2151,29 +2151,28 @@ impl<'a> ChainStoreUpdate<'a> { } // 3. Delete block_hash-indexed data - let block_hash_vec: Vec = block_hash.as_ref().into(); - self.gc_col(DBCol::Block, &block_hash_vec); - self.gc_col(DBCol::BlockExtra, &block_hash_vec); - self.gc_col(DBCol::NextBlockHashes, &block_hash_vec); - self.gc_col(DBCol::ChallengedBlocks, &block_hash_vec); - self.gc_col(DBCol::BlocksToCatchup, &block_hash_vec); + self.gc_col(DBCol::Block, block_hash.as_bytes()); + self.gc_col(DBCol::BlockExtra, block_hash.as_bytes()); + self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes()); + self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes()); + self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes()); let storage_key = KeyForStateChanges::for_block(&block_hash); - let stored_state_changes: Vec> = self + let stored_state_changes: Vec> = self .chain_store .store() .iter_prefix(DBCol::StateChanges, storage_key.as_ref()) - .map(|key| key.0.into()) + .map(|(key, _)| key) .collect(); for key in stored_state_changes { self.gc_col(DBCol::StateChanges, &key); } - self.gc_col(DBCol::BlockRefCount, &block_hash_vec); + self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes()); self.gc_outcomes(&block)?; match gc_mode { GCMode::StateSync { clear_block_info: false } => {} - _ => self.gc_col(DBCol::BlockInfo, &block_hash_vec), + _ => self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()), } - self.gc_col(DBCol::StateDlInfos, &block_hash_vec); + self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes()); // 4. Update or delete block_hash_per_height self.gc_col_block_per_height(&block_hash, height, block.header().epoch_id())?; @@ -2227,17 +2226,17 @@ impl<'a> ChainStoreUpdate<'a> { if hashes.is_empty() { epoch_to_hashes.remove(epoch_id); } - let key = index_to_bytes(height).to_vec(); + let key = &index_to_bytes(height)[..]; if epoch_to_hashes.is_empty() { - store_update.delete(DBCol::BlockPerHeight, &key); - self.chain_store.block_hash_per_height.pop(&key); + store_update.delete(DBCol::BlockPerHeight, key); + self.chain_store.block_hash_per_height.pop(key); } else { - store_update.set_ser(DBCol::BlockPerHeight, &key, &epoch_to_hashes)?; - self.chain_store.block_hash_per_height.put(key.clone(), Arc::new(epoch_to_hashes)); + store_update.set_ser(DBCol::BlockPerHeight, key, &epoch_to_hashes)?; + self.chain_store.block_hash_per_height.put(key.to_vec(), Arc::new(epoch_to_hashes)); } self.inc_gc(DBCol::BlockPerHeight); if self.is_height_processed(height)? { - self.gc_col(DBCol::ProcessedBlockHeights, &key); + self.gc_col(DBCol::ProcessedBlockHeights, key); } self.merge(store_update); Ok(()) @@ -2303,11 +2302,11 @@ impl<'a> ChainStoreUpdate<'a> { let mut outcomes_with_id = self.chain_store.get_outcomes_by_id(&outcome_id)?; outcomes_with_id.retain(|outcome| &outcome.block_hash != block_hash); if outcomes_with_id.is_empty() { - self.gc_col(DBCol::TransactionResult, &outcome_id.as_ref().into()); + self.gc_col(DBCol::TransactionResult, outcome_id.as_bytes()); } else { store_update.set_ser( DBCol::TransactionResult, - outcome_id.as_ref(), + outcome_id.as_bytes(), &outcomes_with_id, )?; } @@ -2318,7 +2317,7 @@ impl<'a> ChainStoreUpdate<'a> { Ok(()) } - fn gc_col(&mut self, col: DBCol, key: &Vec) { + fn gc_col(&mut self, col: DBCol, key: &[u8]) { assert!(col.is_gc()); let mut store_update = self.store().store_update(); match col { diff --git a/core/primitives-core/src/hash.rs b/core/primitives-core/src/hash.rs index 9067244f768..475528f207b 100644 --- a/core/primitives-core/src/hash.rs +++ b/core/primitives-core/src/hash.rs @@ -24,6 +24,10 @@ impl CryptoHash { BorshSerialize::serialize(value, &mut hasher).unwrap(); CryptoHash(hasher.finalize().into()) } + + pub fn as_bytes(&self) -> &[u8; 32] { + &self.0 + } } impl Default for CryptoHash { diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 48e978ddd26..83db0c2ef5f 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -34,6 +34,12 @@ use std::sync::Arc; )] pub struct ChunkHash(pub CryptoHash); +impl ChunkHash { + pub fn as_bytes(&self) -> &[u8; 32] { + self.0.as_bytes() + } +} + impl AsRef<[u8]> for ChunkHash { fn as_ref(&self) -> &[u8] { self.0.as_ref() From a817296e8a7cff553276538f2f4344b0521211fc Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Mon, 13 Jun 2022 17:32:22 +0200 Subject: [PATCH 2/4] view-state: fix rocksdb-stats command (#7016) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sst_dump’s --file argument requires a path to a single SST file. Alas, get_rocksdb_stats tried to pass it path to the directory containing the entire RocksDB database. This unsurprisingly lead to failures such as: $ ./target/debug/neard view-state rocksdb-stats .... thread 'main' panicked at 'Couldn't get RocksDB stats: failed to run sst_dump, exit status: 1, stderr: No valid SST files found in /home/mpn/.near/data Fix the function by iterating over SST files in the directory. While at it, take configuration read from config.json into account when figuring out the path to the database. --- tools/state-viewer/src/cli.rs | 12 +- tools/state-viewer/src/rocksdb_stats.rs | 145 +++++++++++++----------- 2 files changed, 82 insertions(+), 75 deletions(-) diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 85aba338ab9..53e63d4528e 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -70,9 +70,9 @@ impl StateViewerSubCommand { pub fn run(self, home_dir: &Path, genesis_validation: GenesisValidationMode, readwrite: bool) { let near_config = load_config(home_dir, genesis_validation) .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); - let store = near_store::Store::opener(home_dir, &near_config.config.store) - .read_only(!readwrite) - .open(); + let store_opener = + near_store::Store::opener(home_dir, &near_config.config.store).read_only(!readwrite); + let store = store_opener.open(); match self { StateViewerSubCommand::Peers => peers(store), StateViewerSubCommand::State => state(home_dir, near_config, store), @@ -87,7 +87,7 @@ impl StateViewerSubCommand { StateViewerSubCommand::DumpCode(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::DumpAccountStorage(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::EpochInfo(cmd) => cmd.run(home_dir, near_config, store), - StateViewerSubCommand::RocksDBStats(cmd) => cmd.run(home_dir), + StateViewerSubCommand::RocksDBStats(cmd) => cmd.run(&store_opener.get_path()), StateViewerSubCommand::Receipts(cmd) => cmd.run(near_config, store), StateViewerSubCommand::Chunks(cmd) => cmd.run(near_config, store), StateViewerSubCommand::PartialChunks(cmd) => cmd.run(near_config, store), @@ -320,8 +320,8 @@ pub struct RocksDBStatsCmd { } impl RocksDBStatsCmd { - pub fn run(self, home_dir: &Path) { - get_rocksdb_stats(home_dir, self.file).expect("Couldn't get RocksDB stats"); + pub fn run(self, store_dir: &Path) { + get_rocksdb_stats(store_dir, self.file).expect("Couldn't get RocksDB stats"); } } diff --git a/tools/state-viewer/src/rocksdb_stats.rs b/tools/state-viewer/src/rocksdb_stats.rs index dfe3e7bc50e..d0a0ac4dc56 100644 --- a/tools/state-viewer/src/rocksdb_stats.rs +++ b/tools/state-viewer/src/rocksdb_stats.rs @@ -4,7 +4,7 @@ use std::path::{Path, PathBuf}; use std::process::Command; #[derive(Serialize, Debug)] -struct Data { +struct SSTFileData { col: String, entries: u64, estimated_table_size: u64, @@ -15,55 +15,65 @@ struct Data { } // SST file dump keys we use to collect statistics. -const SST_FILE_DUMP_LINES: &[&str] = &[ - "column family name", - "# entries", - "(estimated) table size", - "raw key size", - "raw value size", -]; - -impl Data { - pub fn from_sst_file_dump(lines: &[&str]) -> anyhow::Result { +const SST_FILE_DUMP_LINES: [&str; 5] = + ["column family name", "# entries", "(estimated) table size", "raw key size", "raw value size"]; + +/// Statistics about a single SST file. +impl SSTFileData { + fn for_sst_file(file_path: &Path) -> anyhow::Result { + let mut file_arg = std::ffi::OsString::from("--file="); + file_arg.push(file_path); + + let mut cmd = Command::new("sst_dump"); + // For some reason, adding --command=none makes execution 20x faster. + cmd.arg(file_arg).arg("--show_properties").arg("--command=none"); + + let output = cmd.output()?; + anyhow::ensure!( + output.status.success(), + "failed to run sst_dump, {}, stderr: {}", + output.status, + String::from_utf8_lossy(&output.stderr) + ); + + Self::from_sst_file_dump(std::str::from_utf8(&output.stdout).unwrap()) + } + + fn from_sst_file_dump(output: &str) -> anyhow::Result { // Mapping from SST file dump key to value. - let mut values: HashMap<&str, &str> = Default::default(); + let mut values: HashMap<&str, Option<&str>> = + SST_FILE_DUMP_LINES.iter().map(|key| (*key, None)).collect(); - for line in lines { - let split_result = line.split_once(':'); - let (line, value) = match split_result { + for line in output.lines() { + let (key, value) = match line.split_once(':') { None => continue, - Some((prefix, suffix)) => (prefix.trim(), suffix.trim()), + Some((key, value)) => (key.trim(), value.trim()), }; - for &sst_file_line in SST_FILE_DUMP_LINES { - if line == sst_file_line { - let prev = values.insert(line, value); - if prev.is_some() { - anyhow::bail!( - "Line {} was presented twice and contains values {} and {}", - line, - prev.unwrap(), - value - ); - } + if let Some(entry) = values.get_mut(key) { + if let Some(prev) = entry { + anyhow::bail!( + "Key {key} was presented twice and \ + contains values {prev} and {value}" + ); } + let _ = entry.insert(value); } } - Ok(Data { - col: String::from(values.get(SST_FILE_DUMP_LINES[0]).unwrap().clone()), - entries: values.get(SST_FILE_DUMP_LINES[1]).unwrap().parse::().unwrap(), - estimated_table_size: values - .get(SST_FILE_DUMP_LINES[2]) - .unwrap() - .parse::() - .unwrap(), - raw_key_size: values.get(SST_FILE_DUMP_LINES[3]).unwrap().parse::().unwrap(), - raw_value_size: values.get(SST_FILE_DUMP_LINES[4]).unwrap().parse::().unwrap(), + let get_u64 = + |idx| values.get(SST_FILE_DUMP_LINES[idx]).unwrap().unwrap().parse::().unwrap(); + + Ok(SSTFileData { + col: values.get(SST_FILE_DUMP_LINES[0]).unwrap().unwrap().to_owned(), + entries: get_u64(1), + estimated_table_size: get_u64(2), + raw_key_size: get_u64(3), + raw_value_size: get_u64(4), }) } - pub fn merge(&mut self, other: &Self) { + fn merge(&mut self, other: &Self) { self.entries += other.entries; self.estimated_table_size += other.estimated_table_size; self.raw_key_size += other.raw_key_size; @@ -71,43 +81,40 @@ impl Data { } } -pub fn get_rocksdb_stats(home_dir: &Path, file: Option) -> anyhow::Result<()> { - let store_dir = near_store::get_store_path(&home_dir); - let mut cmd = Command::new("sst_dump"); - cmd.arg(format!("--file={}", store_dir.to_str().unwrap())) - .arg("--show_properties") - .arg("--command=none"); // For some reason, adding this argument makes execution 20x faster - eprintln!("Running {:?} ...", cmd); - let output = cmd.output()?; - if !output.status.success() { - anyhow::bail!( - "failed to run sst_dump, {}, stderr: {}", - output.status, - String::from_utf8_lossy(&output.stderr) - ); +/// Merged statistics for all columns. +struct ColumnsData(HashMap); + +impl ColumnsData { + fn new() -> Self { + Self(Default::default()) } - eprintln!("Parsing output ..."); - let out = std::str::from_utf8(&output.stdout).unwrap(); - let lines: Vec<&str> = out.lines().collect(); - let mut column_data: HashMap = HashMap::new(); - for sst_file_slice in lines.split(|line| line.contains("Process")).skip(1) { - if sst_file_slice.is_empty() { - continue; - } - let data = Data::from_sst_file_dump(sst_file_slice)?; - if let Some(x) = column_data.get_mut(&data.col) { - x.merge(&data); - } else { - column_data.insert(data.col.clone(), data); - } + fn add_sst_data(&mut self, data: SSTFileData) { + self.0.entry(data.col.clone()).and_modify(|entry| entry.merge(&data)).or_insert(data); } - let mut column_data_list: Vec<&Data> = column_data.values().collect(); - column_data_list.sort_by_key(|data| std::cmp::Reverse(data.estimated_table_size)); - let result = serde_json::to_string_pretty(&column_data_list).unwrap(); + fn into_vec(self) -> Vec { + let mut values: Vec<_> = self.0.into_values().collect(); + values.sort_by_key(|data| std::cmp::Reverse(data.estimated_table_size)); + values + } +} + +pub fn get_rocksdb_stats(store_dir: &Path, file: Option) -> anyhow::Result<()> { + let mut data = ColumnsData::new(); + + for entry in std::fs::read_dir(store_dir)? { + let entry = entry?; + let file_name = std::path::PathBuf::from(entry.file_name()); + if file_name.extension().map_or(false, |ext| ext == "sst") { + let file_path = store_dir.join(file_name); + eprintln!("Processing ‘{}’...", file_path.display()); + data.add_sst_data(SSTFileData::for_sst_file(&file_path)?); + } + } eprintln!("Dumping stats ..."); + let result = serde_json::to_string_pretty(&data.into_vec()).unwrap(); match file { None => println!("{}", result), Some(file) => std::fs::write(file, result)?, From e289bd55beadc588f695f3eb7fa62909d7d2ccb8 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Mon, 13 Jun 2022 17:59:48 +0200 Subject: [PATCH 3/4] Removed the abandoned IBF support for the routing table (#7014) Co-authored-by: near-bulldozer[bot] <73298989+near-bulldozer[bot]@users.noreply.github.com> --- chain/client/src/test_utils.rs | 1 - chain/jsonrpc/src/lib.rs | 21 -- chain/network/src/network_protocol/borsh.rs | 35 +-- .../src/network_protocol/borsh_conv.rs | 7 +- chain/network/src/network_protocol/mod.rs | 6 +- .../src/network_protocol/network.proto | 4 +- .../src/network_protocol/proto_conv.rs | 17 +- chain/network/src/peer/peer_actor.rs | 19 +- .../src/peer_manager/peer_manager_actor.rs | 241 ++-------------- chain/network/src/routing/ibf.rs | 250 ----------------- chain/network/src/routing/ibf_peer_set.rs | 265 ------------------ chain/network/src/routing/ibf_set.rs | 139 --------- chain/network/src/routing/mod.rs | 5 - .../src/routing/routing_table_actor.rs | 205 -------------- chain/network/src/test_utils.rs | 1 - chain/network/src/types.rs | 23 +- 16 files changed, 44 insertions(+), 1195 deletions(-) delete mode 100644 chain/network/src/routing/ibf.rs delete mode 100644 chain/network/src/routing/ibf_peer_set.rs delete mode 100644 chain/network/src/routing/ibf_set.rs diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index b3119765106..9700fcbf79a 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -984,7 +984,6 @@ pub fn setup_mock_all_validators( | NetworkRequests::RequestUpdateNonce(_, _) | NetworkRequests::ResponseUpdateNonce(_) | NetworkRequests::ReceiptOutComeRequest(_, _) => {} - | NetworkRequests::IbfMessage { .. } => {} }; } Box::new(Some(resp)) diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index bfaa38517ab..ee23a7c3225 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -320,27 +320,6 @@ impl JsonRpcHandler { .map_err(|err| RpcError::serialization_error(err.to_string())), ) } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - "adv_start_routing_table_syncv2" => { - let params = parse_params::< - near_jsonrpc_adversarial_primitives::StartRoutingTableSyncRequest, - >(params)?; - - self.peer_manager_addr - .send( - near_network::types::PeerManagerMessageRequest::StartRoutingTableSync( - near_network::private_actix::StartRoutingTableSync { - peer_id: params.peer_id, - }, - ), - ) - .await - .map_err(RpcError::rpc_from)?; - Some( - serde_json::to_value(()) - .map_err(|err| RpcError::serialization_error(err.to_string())), - ) - } "adv_get_peer_id" => { let response = self .peer_manager_addr diff --git a/chain/network/src/network_protocol/borsh.rs b/chain/network/src/network_protocol/borsh.rs index b0a9374b67c..0d073ecacc9 100644 --- a/chain/network/src/network_protocol/borsh.rs +++ b/chain/network/src/network_protocol/borsh.rs @@ -159,40 +159,7 @@ pub enum PeerMessage { EpochSyncFinalizationRequest(EpochId), EpochSyncFinalizationResponse(Box), - RoutingTableSyncV2(RoutingSyncV2), + _RoutingTableSyncV2, } #[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready const _: () = assert!(std::mem::size_of::() <= 1144, "PeerMessage > 1144 bytes"); - -#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub enum RoutingSyncV2 { - Version2(RoutingVersion2), -} -const _: () = assert!(std::mem::size_of::() <= 80, "RoutingSyncV2 > 80 bytes"); - -#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub struct PartialSync { - pub(crate) ibf_level: crate::routing::ibf_peer_set::ValidIBFLevel, - pub(crate) ibf: Vec, -} - -#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub enum RoutingState { - PartialSync(PartialSync), - RequestAllEdges, - Done, - RequestMissingEdges(Vec), - InitializeIbf, -} - -#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] -pub struct RoutingVersion2 { - pub(crate) known_edges: u64, - pub(crate) seed: u64, - pub(crate) edges: Vec, - pub(crate) routing_state: RoutingState, -} diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs index 0c4d6d28812..f84fafeed33 100644 --- a/chain/network/src/network_protocol/borsh_conv.rs +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -79,6 +79,8 @@ impl From<&mem::HandshakeFailureReason> for net::HandshakeFailureReason { pub enum ParsePeerMessageError { #[error("HandshakeV2 is deprecated")] DeprecatedHandshakeV2, + #[error("RoutingTableSyncV2 is deprecated")] + DeprecatedRoutingTableSyncV2, } impl TryFrom<&net::PeerMessage> for mem::PeerMessage { @@ -116,7 +118,9 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage { net::PeerMessage::EpochSyncFinalizationResponse(esfr) => { mem::PeerMessage::EpochSyncFinalizationResponse(esfr) } - net::PeerMessage::RoutingTableSyncV2(rs) => mem::PeerMessage::RoutingTableSyncV2(rs), + net::PeerMessage::_RoutingTableSyncV2 => { + return Err(Self::Error::DeprecatedRoutingTableSyncV2) + } }) } } @@ -154,7 +158,6 @@ impl From<&mem::PeerMessage> for net::PeerMessage { mem::PeerMessage::EpochSyncFinalizationResponse(esfr) => { net::PeerMessage::EpochSyncFinalizationResponse(esfr) } - mem::PeerMessage::RoutingTableSyncV2(rs) => net::PeerMessage::RoutingTableSyncV2(rs), } } } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 5607ebbdca1..4a023da8bbc 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -25,9 +25,7 @@ use protobuf::Message as _; use std::fmt; use thiserror::Error; -pub use self::borsh::{ - PartialSync, RoutingState, RoutingSyncV2, RoutingTableUpdate, RoutingVersion2, -}; +pub use self::borsh::RoutingTableUpdate; /// Structure representing handshake between peers. #[derive(PartialEq, Eq, Clone, Debug)] @@ -107,8 +105,6 @@ pub enum PeerMessage { EpochSyncResponse(Box), EpochSyncFinalizationRequest(EpochId), EpochSyncFinalizationResponse(Box), - - RoutingTableSyncV2(RoutingSyncV2), } impl fmt::Display for PeerMessage { diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto index a6bf91a3548..2eaf528bfd7 100644 --- a/chain/network/src/network_protocol/network.proto +++ b/chain/network/src/network_protocol/network.proto @@ -312,6 +312,8 @@ message PeerMessage { // between borsh and protobuf encodings: // https://docs.google.com/document/d/1gCWmt9O-h_-5JDXIqbKxAaSS3Q9pryB1f9DDY1mMav4/edit reserved 1,2,3; + // Deprecated fields. + reserved 24; oneof message_type { Handshake handshake = 4; @@ -340,7 +342,5 @@ message PeerMessage { EpochSyncResponse epoch_sync_response = 21; EpochSyncFinalizationRequest epoch_sync_finalization_request = 22; EpochSyncFinalizationResponse epoch_sync_finalization_response = 23; - - RoutingSyncV2 routing_table_sync_v2 = 24; } } diff --git a/chain/network/src/network_protocol/proto_conv.rs b/chain/network/src/network_protocol/proto_conv.rs index 6279b89aceb..e7372290776 100644 --- a/chain/network/src/network_protocol/proto_conv.rs +++ b/chain/network/src/network_protocol/proto_conv.rs @@ -1,9 +1,7 @@ /// Contains protobuf <-> network_protocol conversions. use crate::network_protocol::proto; use crate::network_protocol::proto::peer_message::Message_type as ProtoMT; -use crate::network_protocol::{ - Handshake, HandshakeFailureReason, PeerMessage, RoutingSyncV2, RoutingTableUpdate, -}; +use crate::network_protocol::{Handshake, HandshakeFailureReason, PeerMessage, RoutingTableUpdate}; use borsh::{BorshDeserialize as _, BorshSerialize as _}; use near_network_primitives::types::{ Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RoutedMessage, @@ -484,12 +482,6 @@ impl From<&PeerMessage> for proto::PeerMessage { ..Default::default() }) } - PeerMessage::RoutingTableSyncV2(rs) => { - ProtoMT::RoutingTableSyncV2(proto::RoutingSyncV2 { - borsh: rs.try_to_vec().unwrap(), - ..Default::default() - }) - } }), ..Default::default() } @@ -501,7 +493,6 @@ pub type ParseRoutedError = borsh::maybestd::io::Error; pub type ParseChallengeError = borsh::maybestd::io::Error; pub type ParseEpochSyncResponseError = borsh::maybestd::io::Error; pub type ParseEpochSyncFinalizationResponseError = borsh::maybestd::io::Error; -pub type ParseRoutingTableSyncV2Error = borsh::maybestd::io::Error; #[derive(Error, Debug)] pub enum ParsePeerMessageError { @@ -543,8 +534,6 @@ pub enum ParsePeerMessageError { EpochSyncFinalizationRequest(ParseRequiredError), #[error("epoch_sync_finalization_response: {0}")] EpochSyncFinalizationResponse(ParseEpochSyncFinalizationResponseError), - #[error("routing_table_sync_v2")] - RoutingTableSyncV2(ParseRoutingTableSyncV2Error), } impl TryFrom<&proto::PeerMessage> for PeerMessage { @@ -616,10 +605,6 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage { .map_err(Self::Error::EpochSyncFinalizationResponse)?, )) } - ProtoMT::RoutingTableSyncV2(rts) => PeerMessage::RoutingTableSyncV2( - RoutingSyncV2::try_from_slice(&rts.borsh) - .map_err(Self::Error::RoutingTableSyncV2)?, - ), }) } } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 51b1f18b07e..ee6e56b5ef9 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -547,8 +547,7 @@ impl PeerActor { | PeerMessage::BlockRequest(_) | PeerMessage::BlockHeadersRequest(_) | PeerMessage::EpochSyncRequest(_) - | PeerMessage::EpochSyncFinalizationRequest(_) - | PeerMessage::RoutingTableSyncV2(_) => { + | PeerMessage::EpochSyncFinalizationRequest(_) => { error!(target: "network", "Peer receive_client_message received unexpected type: {:?}", msg); return; } @@ -1011,22 +1010,6 @@ impl StreamHandler, ReasonForBan>> for PeerActor { Some(self.throttle_controller.clone()), )); } - (PeerStatus::Ready, PeerMessage::RoutingTableSyncV2(ibf_message)) - if cfg!(feature = "protocol_feature_routing_exchange_algorithm") => - { - // TODO(#5155) Add wrapper to be something like this for all messages. - // self.peer_manager_addr.do_send(ActixMessageWrapper::new( - // self.rate_limiter.clone, NetworkRequests::IbfMessage { - // ... - - self.peer_manager_wrapper_addr.do_send(ActixMessageWrapper::new_without_size( - PeerManagerMessageRequest::NetworkRequests(NetworkRequests::IbfMessage { - peer_id: self.other_peer_id().unwrap().clone(), - ibf_msg: ibf_message, - }), - Some(self.throttle_controller.clone()), - )); - } (PeerStatus::Ready, PeerMessage::Routed(routed_message)) => { trace!(target: "network", "Received routed message from {} to {:?}.", self.peer_info, routed_message.target); diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index f2511395d2f..d3071bded72 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -10,7 +10,8 @@ use crate::routing::edge_validator_actor::EdgeValidatorHelper; use crate::routing::routing_table_actor::{ Prune, RoutingTableActor, RoutingTableMessages, RoutingTableMessagesResponse, }; -use crate::routing::routing_table_view::{RoutingTableView, DELETE_PEERS_AFTER_TIME}; +use crate::routing::routing_table_view::RoutingTableView; +use crate::routing::routing_table_view::DELETE_PEERS_AFTER_TIME; use crate::stats::metrics; use crate::stats::metrics::{NetworkMetrics, PARTIAL_ENCODED_CHUNK_REQUEST_DELAY}; use crate::types::{ @@ -23,7 +24,6 @@ use actix::{ Recipient, Running, StreamHandler, WrapFuture, }; use anyhow::bail; -use futures::FutureExt; use near_network_primitives::types::{ AccountOrPeerIdOrHash, Ban, Edge, InboundTcpConnect, KnownPeerStatus, KnownProducer, NetworkConfig, NetworkViewClientMessages, NetworkViewClientResponses, OutboundTcpConnect, @@ -33,7 +33,6 @@ use near_network_primitives::types::{ use near_network_primitives::types::{EdgeState, PartialEdgeInfo}; use near_performance_metrics::framed_write::FramedWrite; use near_performance_metrics_macros::perf; -use near_primitives::checked_feature; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::time::Clock; @@ -121,11 +120,8 @@ struct ConnectedPeer { #[derive(Default)] struct AdvHelper { - #[cfg(feature = "test_features")] adv_disable_edge_propagation: bool, - #[cfg(feature = "test_features")] adv_disable_edge_signature_verification: bool, - #[cfg(feature = "test_features")] adv_disable_edge_pruning: bool, } @@ -556,62 +552,6 @@ impl PeerManagerActor { }); } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - fn initialize_routing_table_exchange( - peer_id: PeerId, - peer_type: PeerType, - addr: Addr, - ctx: &mut Context, - throttle_controller: Option, - ) { - let throttle_controller_clone = throttle_controller.clone(); - near_performance_metrics::actix::run_later(ctx, WAIT_FOR_SYNC_DELAY, move |act, ctx| { - if peer_type == PeerType::Inbound { - act.routing_table_addr - .send(ActixMessageWrapper::new_without_size( - RoutingTableMessages::AddPeerIfMissing(peer_id, None), - throttle_controller, - )) - .into_actor(act) - .map(move |response, act, _ctx| match response.map(|x| x.into_inner()) { - Ok(RoutingTableMessagesResponse::AddPeerResponse { seed }) => { - act.start_routing_table_syncv2(addr, seed, throttle_controller_clone) - } - _ => error!(target: "network", "expected AddIbfSetResponse"), - }) - .spawn(ctx); - } - }); - } - - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - fn start_routing_table_syncv2( - &self, - addr: Addr, - seed: u64, - throttle_controller: Option, - ) { - actix::spawn( - self.routing_table_addr - .send(ActixMessageWrapper::new_without_size( - RoutingTableMessages::StartRoutingTableSync { seed }, - throttle_controller, - )) - .then(move |response| match response.map(|r| r.into_inner()) { - Ok(RoutingTableMessagesResponse::StartRoutingTableSyncResponse(response)) => { - addr.do_send(SendMessage { - message: crate::types::PeerMessage::RoutingTableSyncV2(response), - }); - futures::future::ready(()) - } - _ => { - error!(target: "network", "expected StartRoutingTableSyncResponse"); - futures::future::ready(()) - } - }), - ); - } - /// Register a direct connection to a new peer. This will be called after successfully /// establishing a connection with another peer. It become part of the connected peers. /// @@ -625,12 +565,10 @@ impl PeerManagerActor { partial_edge_info: PartialEdgeInfo, peer_type: PeerType, addr: Addr, - peer_protocol_version: ProtocolVersion, + _peer_protocol_version: ProtocolVersion, throttle_controller: ThrottleController, ctx: &mut Context, ) { - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - let peer_id = full_peer_info.peer_info.id.clone(); debug!(target: "network", ?full_peer_info, "Consolidated connection"); if self.outgoing_peers.contains(&full_peer_info.peer_info.id) { @@ -669,65 +607,32 @@ impl PeerManagerActor { self.add_verified_edges_to_routing_table(vec![new_edge.clone()]); - checked_feature!( - "protocol_feature_routing_exchange_algorithm", - RoutingExchangeAlgorithm, - peer_protocol_version, - { - Self::initialize_routing_table_exchange( - peer_id, - peer_type, - addr.clone(), - ctx, + let network_metrics = self.network_metrics.clone(); + near_performance_metrics::actix::run_later(ctx, WAIT_FOR_SYNC_DELAY, move |act, ctx| { + act.routing_table_addr + .send(ActixMessageWrapper::new_without_size( + RoutingTableMessages::RequestRoutingTable, Some(throttle_controller), - ); - Self::send_sync( - self.network_metrics.clone(), - peer_type, - addr, - ctx, - target_peer_id, - new_edge, - Vec::new(), - ); - }, - { - let network_metrics = self.network_metrics.clone(); - near_performance_metrics::actix::run_later( - ctx, - WAIT_FOR_SYNC_DELAY, - move |act, ctx| { - act.routing_table_addr - .send(ActixMessageWrapper::new_without_size( - RoutingTableMessages::RequestRoutingTable, - Some(throttle_controller), - )) - .into_actor(act) - .map(move |response, _act, ctx| { - match response.map(|r| r.into_inner()) { - Ok( - RoutingTableMessagesResponse::RequestRoutingTableResponse { - edges_info: routing_table, - }, - ) => { - Self::send_sync( - network_metrics, - peer_type, - addr, - ctx, - target_peer_id.clone(), - new_edge, - routing_table, - ); - } - _ => error!(target: "network", "expected AddIbfSetResponse"), - } - }) - .spawn(ctx); - }, - ); - } - ); + )) + .into_actor(act) + .map(move |response, _act, ctx| match response.map(|r| r.into_inner()) { + Ok(RoutingTableMessagesResponse::RequestRoutingTableResponse { + edges_info: routing_table, + }) => { + Self::send_sync( + network_metrics, + peer_type, + addr, + ctx, + target_peer_id.clone(), + new_edge, + routing_table, + ); + } + _ => error!(target: "network", "expected AddIbfSetResponse"), + }) + .spawn(ctx); + }); } fn send_sync( @@ -789,9 +694,6 @@ impl PeerManagerActor { // update that represents the connection removal. self.connected_peers.remove(peer_id); - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - self.routing_table_addr.do_send(RoutingTableMessages::RemovePeer(peer_id.clone())); - if let Some(edge) = self.routing_table_view.get_local_edge(peer_id) { if edge.edge_type() == EdgeState::Active { let edge_update = edge.remove_edge(self.my_peer_id.clone(), &self.config.node_key); @@ -1018,7 +920,7 @@ impl PeerManagerActor { }); } - #[cfg(all(feature = "test_features", feature = "protocol_feature_routing_exchange_algorithm"))] + /// TEST-ONLY fn adv_remove_edges_from_routing_table( &mut self, edges: Vec, @@ -1859,20 +1761,6 @@ impl PeerManagerActor { NetworkResponses::NoResponse } - NetworkRequests::IbfMessage { peer_id, ibf_msg } => { - if cfg!(feature = "protocol_feature_routing_exchange_algorithm") { - match ibf_msg { - crate::network_protocol::RoutingSyncV2::Version2(ibf_msg) => { - if let Some(addr) = - self.connected_peers.get(&peer_id).map(|p| p.addr.clone()) - { - self.process_ibf_msg(&peer_id, ibf_msg, addr, throttle_controller) - } - } - } - } - NetworkResponses::NoResponse - } NetworkRequests::Challenge(challenge) => { // TODO(illia): smarter routing? Self::broadcast_message( @@ -1941,27 +1829,6 @@ impl PeerManagerActor { } } - #[cfg(all(feature = "test_features", feature = "protocol_feature_routing_exchange_algorithm"))] - #[perf] - fn handle_msg_start_routing_table_sync( - &self, - msg: crate::private_actix::StartRoutingTableSync, - ctx: &mut Context, - throttle_controller: Option, - ) { - if let Some(connected_peer) = self.connected_peers.get(&msg.peer_id) { - let addr = connected_peer.addr.clone(); - Self::initialize_routing_table_exchange( - msg.peer_id, - PeerType::Inbound, - addr, - ctx, - throttle_controller, - ); - } - } - - #[cfg(feature = "test_features")] #[perf] fn handle_msg_set_adv_options(&mut self, msg: crate::test_utils::SetAdvOptions) { if let Some(disable_edge_propagation) = msg.disable_edge_propagation { @@ -1979,7 +1846,6 @@ impl PeerManagerActor { } } - #[cfg(all(feature = "test_features", feature = "protocol_feature_routing_exchange_algorithm"))] #[perf] fn handle_msg_set_routing_table( &mut self, @@ -2253,19 +2119,10 @@ impl PeerManagerActor { self.handle_msg_ban(msg); PeerManagerMessageResponse::Ban(()) } - #[cfg(feature = "test_features")] - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - PeerManagerMessageRequest::StartRoutingTableSync(msg) => { - self.handle_msg_start_routing_table_sync(msg, ctx, throttle_controller); - PeerManagerMessageResponse::StartRoutingTableSync(()) - } - #[cfg(feature = "test_features")] PeerManagerMessageRequest::SetAdvOptions(msg) => { self.handle_msg_set_adv_options(msg); PeerManagerMessageResponse::SetAdvOptions(()) } - #[cfg(feature = "test_features")] - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] PeerManagerMessageRequest::SetRoutingTable(msg) => { self.handle_msg_set_routing_table(msg, ctx); PeerManagerMessageResponse::SetRoutingTable(()) @@ -2340,46 +2197,6 @@ impl PeerManagerActor { } } } - - fn process_ibf_msg( - &self, - peer_id: &PeerId, - mut ibf_msg: crate::network_protocol::RoutingVersion2, - addr: Addr, - throttle_controller: Option, - ) { - self.validate_edges_and_add_to_routing_table( - peer_id.clone(), - std::mem::take(&mut ibf_msg.edges), - throttle_controller.clone(), - ); - actix::spawn( - self.routing_table_addr - .send(ActixMessageWrapper::new_without_size( - RoutingTableMessages::ProcessIbfMessage { peer_id: peer_id.clone(), ibf_msg }, - throttle_controller, - )) - .then(move |response| { - match response.map(|r| r.into_inner()) { - Ok(RoutingTableMessagesResponse::ProcessIbfMessageResponse { - ibf_msg: response_ibf_msg, - }) => { - if let Some(response_ibf_msg) = response_ibf_msg { - addr.do_send(SendMessage { - message: PeerMessage::RoutingTableSyncV2( - crate::network_protocol::RoutingSyncV2::Version2( - response_ibf_msg, - ), - ), - }); - } - } - _ => error!(target: "network", "expected ProcessIbfMessageResponse"), - } - futures::future::ready(()) - }), - ); - } } impl Handler> for PeerManagerActor { diff --git a/chain/network/src/routing/ibf.rs b/chain/network/src/routing/ibf.rs deleted file mode 100644 index b8cbdb67dae..00000000000 --- a/chain/network/src/routing/ibf.rs +++ /dev/null @@ -1,250 +0,0 @@ -use borsh::{BorshDeserialize, BorshSerialize}; -use std::cmp::{max, min}; -use std::collections::hash_map::DefaultHasher; -use std::hash::Hasher; -use tracing::error; - -/// Ibf consists of multiple boxes, this constant specified the number of unique boxes -/// into which each element is inserted into IBF. According to https://www.ics.uci.edu/~eppstein/pubs/EppGooUye-SIGCOMM-11.pdf -/// either number 3 or 4 is optimal. -const NUM_HASHES: usize = 3; - -/// IbfElem represents a simple box, which may contain 0, 1 or more elements. -/// Each box stores xor of elements inserted inside the box, and xor of their hashes. -/// -/// The box considered to be empty if both xor of elements inserted and their hashes is equal to 0. -/// To check whenever only one element is inside the box, we can check whenever hash of `xor_element` -/// is equal to `xor_hash`. -#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug, Default)] -pub struct IbfBox { - xor_elem: u64, - xor_hash: u64, -} - -impl IbfBox { - fn new(elem: u64, hash: u64) -> IbfBox { - IbfBox { xor_elem: elem, xor_hash: hash } - } - - fn merge(&mut self, rhs: &IbfBox) { - self.xor_elem ^= rhs.xor_elem; - self.xor_hash ^= rhs.xor_hash; - } -} - -type IbfHasher = DefaultHasher; - -/// Data structure representing Inverse Bloom Filter (IBF). It's used to store elements of a set, -/// in the current implementation they are of type 'u64', but it could be extended. Each `IbfSet` -/// is represented by a list `data` of boxes. Each box contains xor of elements inserted inside given -/// box and xor of hashes of elements inserted into box. -/// -/// This data structure has a few important properties: -/// - given two IBFs each representing set of numbers. If you merge them together you get new IBF -/// representing set, which is a symmetric difference of those two sets. -/// - given IBF with size of `n`, you are able to recover the elements put into the set, -/// as long as the number of elements added is less than around 50%-60% for sufficiently large `n`. -/// -/// -/// https://www.ics.uci.edu/~eppstein/pubs/EppGooUye-SIGCOMM-11.pdf -#[derive(Clone)] -pub struct Ibf { - /// Defines length of Ibf, which is 2^k + NUM_HASHES - 1 - k: i32, - /// Vector containing all elements of IBF. Each one has xor of values stored, and xor of hashes stored. - pub data: Vec, - /// Hashing object used to generate pseudo-random number - hasher: IbfHasher, - /// Hash seed used to generate IBF - pub seed: u64, -} - -impl Ibf { - pub fn new(capacity: usize, seed: u64) -> Self { - let k = Ibf::calculate_k(capacity); - let new_capacity = (1 << k) + NUM_HASHES - 1; - - let mut hasher = IbfHasher::default(); - hasher.write_u64(seed); - Self { data: vec![IbfBox::default(); new_capacity], hasher, k, seed } - } - - /// Create Ibf from vector of elements and seed - pub fn from_vec(data: &[IbfBox], seed: u64) -> Self { - let k = Ibf::calculate_k(data.len()); - - let mut hasher = IbfHasher::default(); - hasher.write_u64(seed); - Self { data: data.into(), hasher, k, seed } - } - - /// Calculate minimum parameter 'k', such that given IBF has at least 'capacity` elements. - fn calculate_k(capacity: usize) -> i32 { - let mut k = 0; - // In order make computation of indexes not require doing division, we allocate - // 2^k + NUM_HASHES - 1 hashes. This allows us to use compute indexes using bits. - while (1 << k) + NUM_HASHES - 1 < capacity { - k += 1; - } - k - } - - /// Add element to the set. It's up to the caller to make sure this method is called only if - /// the item is not in the set. - pub fn add(&mut self, elem: u64) { - self.insert(elem) - } - - /// Remove element from the set. It's up to the caller to make sure this method is called only - /// if the item is in the set. - /// It's also worth noting that add/remove functions are identical. - pub fn remove(&mut self, elem: u64) { - self.insert(elem) - } - - /// Compute hash of element - fn compute_hash(&self, elem: u64) -> u64 { - let mut h = self.hasher.clone(); - h.write_u64(elem); - h.finish() - } - - /// Add element to IBF, which involves updating values of 3 boxes. - fn insert(&mut self, elem: u64) { - self.insert_value(elem); - } - - /// Merge two sets together, the result will be IbfSet representing symmetric difference - /// between those sets. - pub fn merge(&mut self, rhs_data: &[IbfBox], rhs_seed: u64) -> bool { - if self.data.len() != rhs_data.len() || self.seed != rhs_seed { - error!(target: "network", - "Failed to merge len: {} {} seed: {} {}", - self.data.len(), - rhs_data.len(), - self.seed, - rhs_seed - ); - return false; - } - for (lhs, rhs) in self.data.iter_mut().zip(rhs_data) { - lhs.merge(rhs) - } - true - } - - #[cfg(test)] - fn recover(&mut self) -> Result, &'static str> { - let (result, difference) = self.try_recover(); - - if difference != 0 { - for i in 0..self.data.len() { - if self.data[i].xor_elem != 0 { - println!( - "{} {:?} {}", - i, - self.data[i], - self.compute_hash(self.data[i].xor_elem) - ); - } - } - return Err("unable to recover result"); - } - Ok(result) - } - - /// Try to recover elements inserted into IBF. - /// - /// Returns list of recovered elements and number of boxes that were not recoverable. - pub fn try_recover(&mut self) -> (Vec, u64) { - let mut result = Vec::with_capacity(self.data.len()); - let mut to_check = Vec::with_capacity(self.data.len()); - for i in 0..self.data.len() { - to_check.push(i); - - while let Some(i) = to_check.pop() { - let elem = self.data[i].xor_elem; - if elem == 0 && self.data[i].xor_hash == 0 { - continue; - } - let elem_hash = self.compute_hash(elem); - if elem_hash != self.data[i].xor_hash { - continue; - } - - result.push(elem); - self.remove_element_and_add_recovered_items_to_queue( - elem, - elem_hash, - &mut to_check, - ); - } - } - let elems_that_differ = self.data.iter().filter(|it| it.xor_elem != 0).count() as u64; - (result, elems_that_differ) - } - - /// For given hash, generate list of indexes, where given element should be inserted. - fn generate_idx(&self, elem_hash: u64) -> [usize; NUM_HASHES] { - let mask = (1 << self.k) - 1; - let pos0 = elem_hash & mask; - let mut pos1 = (elem_hash >> self.k) & mask; - let mut pos2 = (elem_hash >> (2 * self.k)) & mask; - if pos1 >= pos0 { - pos1 = (pos1 + 1) & mask; - } - if pos2 >= min(pos0, pos1) { - pos2 = (pos2 + 1) & mask; - } - if pos2 >= max(pos0, pos1) { - pos2 = (pos2 + 1) & mask; - } - [pos0 as usize, pos1 as usize, pos2 as usize] - } - - /// Remove element from IBF, and add elements that were recovered to queue. - fn remove_element_and_add_recovered_items_to_queue( - &mut self, - elem: u64, - elem_hash: u64, - queue: &mut Vec, - ) { - let pos_list = self.generate_idx(elem_hash); - - for pos in pos_list { - self.data[pos].merge(&IbfBox::new(elem, elem_hash)); - queue.push(pos); - } - } - - /// Insert value into IBF - fn insert_value(&mut self, elem: u64) { - let elem_hash = self.compute_hash(elem); - let pos_list = self.generate_idx(elem_hash); - - for pos in pos_list { - self.data[pos].merge(&IbfBox::new(elem, elem_hash)); - } - } -} - -#[cfg(test)] -mod tests { - use crate::routing::ibf::Ibf; - - fn create_blt(elements: impl IntoIterator, capacity: usize) -> Ibf { - let mut sketch = Ibf::new(capacity, 0); - for item in elements.into_iter() { - sketch.add(item); - } - sketch - } - - #[test] - fn create_blt_test() { - let set = 1_000_000_300_000_u64..1_000_000_301_000_u64; - - assert_eq!(1000, create_blt(set, 2048).recover().unwrap().len()) - } -} diff --git a/chain/network/src/routing/ibf_peer_set.rs b/chain/network/src/routing/ibf_peer_set.rs deleted file mode 100644 index de98f22ec7f..00000000000 --- a/chain/network/src/routing/ibf_peer_set.rs +++ /dev/null @@ -1,265 +0,0 @@ -#![allow(dead_code)] -use crate::routing::ibf_set::IbfSet; -use borsh::{BorshDeserialize, BorshSerialize}; -use near_network_primitives::types::{Edge, SimpleEdge}; -use near_primitives::network::PeerId; -use rand::Rng; -use std::collections::HashMap; - -pub type SlotMapId = u64; - -#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug, Copy)] -pub struct ValidIBFLevel(pub u64); - -/// We create IbfSets of various sizes from 2^10+2 up to 2^17+2. Those constants specify valid ranges. -pub const MIN_IBF_LEVEL: ValidIBFLevel = ValidIBFLevel(10); -pub const MAX_IBF_LEVEL: ValidIBFLevel = ValidIBFLevel(17); - -/// Represents IbfLevel from 10 to 17. -impl ValidIBFLevel { - pub fn inc(&self) -> Option { - if self.0 + 1 >= MIN_IBF_LEVEL.0 && self.0 < MAX_IBF_LEVEL.0 { - Some(ValidIBFLevel(self.0 + 1)) - } else { - None - } - } - - pub fn is_valid(&self) -> bool { - self.0 >= MIN_IBF_LEVEL.0 && self.0 <= MAX_IBF_LEVEL.0 - } -} - -/// In order to reduce memory usage/bandwidth used we map each edge to u64. -/// SlotMap contains mapping from SimpleToHash, and vice versa. -#[derive(Default)] -pub struct SlotMap { - id: u64, - id2e: HashMap, - e2id: HashMap, -} - -impl SlotMap { - pub fn insert(&mut self, edge: &SimpleEdge) -> Option { - if self.e2id.contains_key(edge) { - return None; - } - - let new_id = self.id as SlotMapId; - self.id += 1; - - self.e2id.insert(edge.clone(), new_id); - self.id2e.insert(new_id, edge.clone()); - - Some(new_id) - } - - pub fn get(&self, edge: &SimpleEdge) -> Option { - self.e2id.get(edge).cloned() - } - - fn get_by_id(&self, id: &SlotMapId) -> Option<&SimpleEdge> { - self.id2e.get(id) - } - - fn pop(&mut self, edge: &SimpleEdge) -> Option { - if let Some(&id) = self.e2id.get(edge) { - self.e2id.remove(edge); - self.id2e.remove(&id); - - return Some(id); - } - None - } -} - -/// IBfPeerSet contains collection of IbfSets, each for one connected peer. -#[derive(Default)] -pub struct IbfPeerSet { - peers: HashMap>, - slot_map: SlotMap, - edges: u64, -} - -impl IbfPeerSet { - pub fn get(&self, peer_id: &PeerId) -> Option<&IbfSet> { - self.peers.get(peer_id) - } - - /// Add IbfSet assigned to given peer, defined by `seed`. - pub fn add_peer( - &mut self, - peer_id: PeerId, - seed: Option, - edges_info: &mut HashMap<(PeerId, PeerId), Edge>, - ) -> u64 { - if let Some(ibf_set) = self.peers.get(&peer_id) { - return ibf_set.get_seed(); - } - let seed = if let Some(seed) = seed { - seed - } else { - let mut rng = rand::thread_rng(); - rng.gen() - }; - - let mut ibf_set = IbfSet::new(seed); - // Initialize IbfSet with edges - for (key, e) in edges_info.iter() { - let se = SimpleEdge::new(key.0.clone(), key.1.clone(), e.nonce()); - if let Some(id) = self.slot_map.get(&se) { - ibf_set.add_edge(&se, id); - } - } - let seed = ibf_set.get_seed(); - self.peers.insert(peer_id, ibf_set); - seed - } - - /// Remove IbfSet associated with peer. - pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.peers.remove(peer_id); - } - - /// Add edge to each IbfSet for each peer. - pub fn add_edge(&mut self, edge: &SimpleEdge) -> Option { - let id = self.slot_map.insert(edge); - if let Some(id) = id { - self.edges += 1; - for (_, val) in self.peers.iter_mut() { - val.add_edge(edge, id); - } - } - id - } - - /// Remove edge from each IbfSet for each peer. - pub fn remove_edge(&mut self, edge: &SimpleEdge) -> bool { - if let Some(_id) = self.slot_map.pop(edge) { - self.edges -= 1; - for (_, val) in self.peers.iter_mut() { - val.remove_edge(edge); - } - return true; - } - false - } - - /// Recover edges based on list of SlotMapId - fn recover_edges<'a>( - &'a self, - edges: &'a [SlotMapId], - ) -> impl Iterator + 'a { - edges.iter().filter_map(|v| self.slot_map.get_by_id(v)) - } - - /// After we recover list of hashes, split edges between those that we know, and ones we don't know about. - pub fn split_edges_for_peer( - &self, - peer_id: &PeerId, - unknown_edges: &[u64], - ) -> (Vec, Vec) { - if let Some(ibf) = self.get(peer_id) { - let (known_edges, unknown_edges) = ibf.get_edges_by_hashes_ext(unknown_edges); - return (self.recover_edges(known_edges.as_slice()).cloned().collect(), unknown_edges); - } - Default::default() - } -} - -#[cfg(test)] -mod test { - use crate::routing::ibf_peer_set::{IbfPeerSet, SlotMap, ValidIBFLevel}; - use crate::routing::ibf_set::IbfSet; - use crate::test_utils::random_peer_id; - use near_network_primitives::types::{Edge, SimpleEdge}; - use near_primitives::network::PeerId; - use std::collections::HashMap; - - #[test] - fn test_slot_map() { - let p0 = random_peer_id(); - let p1 = random_peer_id(); - let p2 = random_peer_id(); - - let e0 = SimpleEdge::new(p0, p1.clone(), 0); - let e1 = SimpleEdge::new(p1.clone(), p2.clone(), 0); - let e2 = SimpleEdge::new(p1, p2, 3); - - let mut sm = SlotMap::default(); - assert_eq!(0_u64, sm.insert(&e0).unwrap()); - - assert!(sm.insert(&e0).is_none()); - - assert_eq!(1_u64, sm.insert(&e1).unwrap()); - assert_eq!(2_u64, sm.insert(&e2).unwrap()); - - assert_eq!(Some(2_u64), sm.pop(&e2)); - assert_eq!(None, sm.pop(&e2)); - assert_eq!(Some(0_u64), sm.pop(&e0)); - assert_eq!(None, sm.pop(&e0)); - - assert_eq!(Some(1_u64), sm.get(&e1)); - - assert_eq!(Some(&e1), sm.get_by_id(&1_u64)); - assert_eq!(None, sm.get_by_id(&1000_u64)); - - assert_eq!(Some(1_u64), sm.pop(&e1)); - assert_eq!(None, sm.get(&e1)); - assert_eq!(None, sm.pop(&e1)); - - assert_eq!(3_u64, sm.insert(&e2).unwrap()); - assert_eq!(Some(3_u64), sm.pop(&e2)); - - assert_eq!(None, sm.get_by_id(&1_u64)); - assert_eq!(None, sm.get_by_id(&1000_u64)); - } - - #[test] - fn test_adding_ibf_peer_set_adding_peers() { - let peer_id = random_peer_id(); - let peer_id2 = random_peer_id(); - let mut ips = IbfPeerSet::default(); - - let mut ibf_set = IbfSet::::new(1111); - - let edge = Edge::make_fake_edge(peer_id.clone(), peer_id2.clone(), 111); - let mut edges_info: HashMap<(PeerId, PeerId), Edge> = Default::default(); - edges_info.insert((peer_id.clone(), peer_id2.clone()), edge.clone()); - - // Add Peer - ips.add_peer(peer_id.clone(), Some(1111), &mut edges_info); - - // Remove Peer - assert!(ips.get(&peer_id).is_some()); - assert!(ips.get(&peer_id2).is_none()); - ips.remove_peer(&peer_id); - assert!(ips.get(&peer_id).is_none()); - - // Add Peer again - ips.add_peer(peer_id.clone(), Some(1111), &mut edges_info); - - // Add edge - let e = SimpleEdge::new(peer_id.clone(), peer_id2, 111); - let se = ips.add_edge(&e).unwrap(); - ibf_set.add_edge(&e, se); - assert!(ips.add_edge(&e).is_none()); - - assert!(ips.remove_edge(&e)); - assert!(!ips.remove_edge(&e)); - - assert!(ips.add_edge(&e).is_some()); - - let mut hashes = ibf_set.get_ibf(ValidIBFLevel(10)).clone().try_recover().0; - assert_eq!(1, hashes.len()); - - for x in 0..4 { - hashes.push(x); - } - - // try to recover the edge - assert_eq!(4, ips.split_edges_for_peer(&peer_id, &hashes).1.len()); - assert_eq!(vec!(edge.to_simple_edge()), ips.split_edges_for_peer(&peer_id, &hashes).0); - } -} diff --git a/chain/network/src/routing/ibf_set.rs b/chain/network/src/routing/ibf_set.rs deleted file mode 100644 index 938e8ccfa67..00000000000 --- a/chain/network/src/routing/ibf_set.rs +++ /dev/null @@ -1,139 +0,0 @@ -use crate::routing::ibf::{Ibf, IbfBox}; -use crate::routing::ibf_peer_set::{SlotMapId, ValidIBFLevel, MAX_IBF_LEVEL, MIN_IBF_LEVEL}; -use near_stable_hasher::StableHasher; -use std::collections::HashMap; -use std::fmt; -use std::fmt::Debug; -use std::hash::{Hash, Hasher}; -use std::marker::PhantomData; -use tracing::{error, warn}; - -/// Stores list of `Ibf` data structures of various sizes. -/// In the current implementation we use sizes from 2^10+ 2 ... 2^17 + 2. -#[derive(Default)] -pub struct IbfSet { - seed: u64, - ibf: Vec, - h2e: HashMap, - hasher: StableHasher, - pd: PhantomData, -} - -impl Debug for IbfSet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("").field(&self.seed).field(&self.ibf.len()).field(&self.h2e.len()).finish() - } -} - -impl IbfSet -where - T: Hash + Clone, -{ - pub fn get_ibf_vec(&self, k: ValidIBFLevel) -> &Vec { - &self.ibf[(k.0 - MIN_IBF_LEVEL.0) as usize].data - } - - /// Get seed used to generate this IbfSet - pub fn get_seed(&self) -> u64 { - self.seed - } - - /// Get `Ibf` based on selected `k`. - pub fn get_ibf(&self, k: ValidIBFLevel) -> &Ibf { - &self.ibf[(k.0 - MIN_IBF_LEVEL.0) as usize] - } - - pub fn new(seed: u64) -> Self { - let ibf = (MIN_IBF_LEVEL.0..=MAX_IBF_LEVEL.0).map(|i| Ibf::new(1 << i, seed ^ i)); - let mut hasher = StableHasher::default(); - hasher.write_u64(seed); - Self { - seed, - ibf: ibf.collect(), - h2e: Default::default(), - hasher, - pd: PhantomData::::default(), - } - } - - /// Get list of edges based on given list of hashes. - pub fn get_edges_by_hashes_ext(&self, edges: &[u64]) -> (Vec, Vec) { - let mut known_edges = vec![]; - let mut unknown_edges = vec![]; - - for hash in edges { - if let Some(edge) = self.h2e.get(hash) { - known_edges.push(*edge) - } else { - unknown_edges.push(*hash) - } - } - (known_edges, unknown_edges) - } - - /// Add edge to all IBFs - pub fn add_edge(&mut self, item: &T, id: SlotMapId) -> bool { - let mut h = self.hasher.clone(); - item.hash(&mut h); - let h = h.finish(); - if self.h2e.insert(h, id).is_some() { - warn!(target: "network", "hash already exists in IbfSet"); - return false; - } - for ibf in self.ibf.iter_mut() { - ibf.add(h); - } - true - } - - /// Remove edge from all IBFs - pub fn remove_edge(&mut self, item: &T) -> bool { - let mut h = self.hasher.clone(); - item.hash(&mut h); - let h = h.finish(); - if self.h2e.remove(&h).is_none() { - error!(target: "network", "trying to remove not existing edge from IbfSet"); - return false; - } - for ibf in self.ibf.iter_mut() { - ibf.remove(h); - } - true - } -} - -#[cfg(test)] -mod test { - use crate::routing::ibf_peer_set::{SlotMapId, ValidIBFLevel}; - use crate::routing::ibf_set::IbfSet; - - #[test] - fn test_ibf_set() { - let mut a = IbfSet::::new(12); - let mut b = IbfSet::::new(12); - - for i in 0..10000 { - a.add_edge(&(i as u64), (i + 1000000) as SlotMapId); - } - for i in 0..10 { - a.remove_edge(&(i as u64)); - } - for i in 0..10000 { - b.add_edge(&(i + 100_u64), (i + 2000000) as SlotMapId); - } - for i in 10..=17 { - let mut ibf1 = a.get_ibf(ValidIBFLevel(i)).clone(); - let ibf2 = b.get_ibf(ValidIBFLevel(i)); - ibf1.merge(&ibf2.data, ibf2.seed); - let (mut res, diff) = ibf1.try_recover(); - assert_eq!(0, diff); - assert_eq!(200 - 10, res.len()); - - for x in 0..333 { - res.push(x + 33333333); - } - assert_eq!(100 - 10, a.get_edges_by_hashes_ext(&res).0.len()); - assert_eq!(100 + 333, a.get_edges_by_hashes_ext(&res).1.len()); - } - } -} diff --git a/chain/network/src/routing/mod.rs b/chain/network/src/routing/mod.rs index 4eb47a400bc..5010c0df8da 100644 --- a/chain/network/src/routing/mod.rs +++ b/chain/network/src/routing/mod.rs @@ -1,14 +1,9 @@ pub(crate) mod edge_validator_actor; pub mod graph; -pub(crate) mod ibf; -pub(crate) mod ibf_peer_set; -pub(crate) mod ibf_set; mod route_back_cache; #[cfg(feature = "test_features")] pub use crate::private_actix::GetRoutingTableResult; pub(crate) mod routing_table_actor; pub mod routing_table_view; -pub use crate::routing::ibf_peer_set::SlotMapId; -pub use crate::routing::ibf_set::IbfSet; pub use routing_table_actor::start_routing_table_actor; diff --git a/chain/network/src/routing/routing_table_actor.rs b/chain/network/src/routing/routing_table_actor.rs index 0a153615d12..9a999c66b58 100644 --- a/chain/network/src/routing/routing_table_actor.rs +++ b/chain/network/src/routing/routing_table_actor.rs @@ -45,8 +45,6 @@ pub enum Prune { pub struct RoutingTableActor { /// Data structure with all edges. It's guaranteed that `peer.0` < `peer.1`. pub edges_info: HashMap<(PeerId, PeerId), Edge>, - /// Data structure used for exchanging routing tables. - pub peer_ibf_set: crate::routing::ibf_peer_set::IbfPeerSet, /// Current view of the network represented by undirected graph. /// Nodes are Peers and edges are active connections. pub raw_graph: Graph, @@ -88,7 +86,6 @@ impl RoutingTableActor { let edge_validator_pool = SyncArbiter::start(4, || EdgeValidatorActor {}); Self { edges_info: Default::default(), - peer_ibf_set: Default::default(), raw_graph: Graph::new(my_peer_id), peer_forwarding: Default::default(), peer_last_time_reachable: Default::default(), @@ -108,9 +105,6 @@ impl RoutingTableActor { } pub fn remove_edge(&mut self, edge: &Edge) { - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - self.peer_ibf_set.remove_edge(&edge.to_simple_edge()); - let key = edge.key(); if self.edges_info.remove(key).is_some() { self.raw_graph.remove_edge(&edge.key().0, &edge.key().1); @@ -135,8 +129,6 @@ impl RoutingTableActor { self.raw_graph.remove_edge(&key.0, &key.1); } } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - self.peer_ibf_set.add_edge(&edge.to_simple_edge()); self.edges_info.insert(key.clone(), edge); true } @@ -456,21 +448,9 @@ pub enum RoutingTableMessages { /// `signature1` is valid. AddVerifiedEdges { edges: Vec }, /// Remove edges for unit tests - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] AdvRemoveEdges(Vec), /// Get `RoutingTable` for debugging purposes. RequestRoutingTable, - /// Add `PeerId` and generate `IbfSet`. - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - AddPeerIfMissing(PeerId, Option), - /// Remove `PeerId` from `IbfSet` - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - RemovePeer(PeerId), - /// Do new routing table exchange algorithm. - ProcessIbfMessage { peer_id: PeerId, ibf_msg: crate::types::RoutingVersion2 }, - /// Start new routing table sync. - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - StartRoutingTableSync { seed: u64 }, /// Request routing table update and maybe prune edges. RoutingTableUpdate { prune: Prune, prune_edges_not_reachable_for: Duration }, /// Gets list of edges to validate from another peer. @@ -481,20 +461,11 @@ pub enum RoutingTableMessages { #[derive(actix::MessageResponse, Debug)] pub enum RoutingTableMessagesResponse { - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - AddPeerResponse { - seed: u64, - }, Empty, - ProcessIbfMessageResponse { - ibf_msg: Option, - }, RequestRoutingTableResponse { edges_info: Vec, }, AddVerifiedEdgesResponse(Vec), - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - StartRoutingTableSyncResponse(crate::types::RoutingSyncV2), RoutingTableUpdateResponse { /// PeerManager maintains list of local edges. We will notify `PeerManager` /// to remove those edges. @@ -506,31 +477,6 @@ pub enum RoutingTableMessagesResponse { }, } -impl RoutingTableActor { - pub fn exchange_routing_tables_using_ibf( - &self, - peer_id: &PeerId, - ibf_set: &crate::routing::IbfSet, - ibf_level: crate::routing::ibf_peer_set::ValidIBFLevel, - ibf_vec: &[crate::routing::ibf::IbfBox], - seed: u64, - ) -> (Vec, Vec, u64) { - let ibf = ibf_set.get_ibf(ibf_level); - - let mut new_ibf = crate::routing::ibf::Ibf::from_vec(ibf_vec, seed ^ (ibf_level.0 as u64)); - - if !new_ibf.merge(&ibf.data, seed ^ (ibf_level.0 as u64)) { - tracing::error!(target: "network", "exchange routing tables failed with peer {}", peer_id); - return (Default::default(), Default::default(), 0); - } - - let (edge_hashes, unknown_edges_count) = new_ibf.try_recover(); - let (known, unknown_edges) = self.peer_ibf_set.split_edges_for_peer(peer_id, &edge_hashes); - - (known, unknown_edges, unknown_edges_count) - } -} - impl Handler for RoutingTableActor { type Result = RoutingTableMessagesResponse; @@ -571,18 +517,6 @@ impl Handler for RoutingTableActor { peers_to_ban: std::mem::take(&mut self.peers_to_ban), } } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - RoutingTableMessages::StartRoutingTableSync { seed } => { - RoutingTableMessagesResponse::StartRoutingTableSyncResponse( - crate::types::RoutingSyncV2::Version2(crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: Default::default(), - routing_state: crate::types::RoutingState::InitializeIbf, - }), - ) - } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] RoutingTableMessages::AdvRemoveEdges(edges) => { for edge in edges.iter() { self.remove_edge(edge); @@ -594,145 +528,6 @@ impl Handler for RoutingTableActor { edges_info: self.edges_info.iter().map(|(_k, v)| v.clone()).collect(), } } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - RoutingTableMessages::AddPeerIfMissing(peer_id, ibf_set) => { - let seed = self.peer_ibf_set.add_peer(peer_id, ibf_set, &mut self.edges_info); - RoutingTableMessagesResponse::AddPeerResponse { seed } - } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - RoutingTableMessages::RemovePeer(peer_id) => { - self.peer_ibf_set.remove_peer(&peer_id); - RoutingTableMessagesResponse::Empty - } - RoutingTableMessages::ProcessIbfMessage { peer_id, ibf_msg } => { - match ibf_msg.routing_state { - crate::types::RoutingState::PartialSync(partial_sync) => { - if let Some(ibf_set) = self.peer_ibf_set.get(&peer_id) { - let seed = ibf_msg.seed; - let (edges_for_peer, unknown_edge_hashes, unknown_edges_count) = self - .exchange_routing_tables_using_ibf( - &peer_id, - ibf_set, - partial_sync.ibf_level, - &partial_sync.ibf, - ibf_msg.seed, - ); - - let edges_for_peer = edges_for_peer - .iter() - .filter_map(|x| self.edges_info.get(x.key()).cloned()) - .collect(); - // Prepare message - let ibf_msg = if unknown_edges_count == 0 - && !unknown_edge_hashes.is_empty() - { - crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: edges_for_peer, - routing_state: crate::types::RoutingState::RequestMissingEdges( - unknown_edge_hashes, - ), - } - } else if unknown_edges_count == 0 && unknown_edge_hashes.is_empty() { - crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: edges_for_peer, - routing_state: crate::types::RoutingState::Done, - } - } else if let Some(new_ibf_level) = partial_sync.ibf_level.inc() { - let ibf_vec = ibf_set.get_ibf_vec(new_ibf_level); - crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: edges_for_peer, - routing_state: crate::types::RoutingState::PartialSync( - crate::types::PartialSync { - ibf_level: new_ibf_level, - ibf: ibf_vec.clone(), - }, - ), - } - } else { - crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: self.edges_info.iter().map(|x| x.1.clone()).collect(), - routing_state: crate::types::RoutingState::RequestAllEdges, - } - }; - RoutingTableMessagesResponse::ProcessIbfMessageResponse { - ibf_msg: Some(ibf_msg), - } - } else { - tracing::error!(target: "network", "Peer not found {}", peer_id); - RoutingTableMessagesResponse::Empty - } - } - crate::types::RoutingState::InitializeIbf => { - self.peer_ibf_set.add_peer( - peer_id.clone(), - Some(ibf_msg.seed), - &mut self.edges_info, - ); - if let Some(ibf_set) = self.peer_ibf_set.get(&peer_id) { - let seed = ibf_set.get_seed(); - let ibf_vec = - ibf_set.get_ibf_vec(crate::routing::ibf_peer_set::MIN_IBF_LEVEL); - RoutingTableMessagesResponse::ProcessIbfMessageResponse { - ibf_msg: Some(crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: Default::default(), - routing_state: crate::types::RoutingState::PartialSync( - crate::types::PartialSync { - ibf_level: crate::routing::ibf_peer_set::MIN_IBF_LEVEL, - ibf: ibf_vec.clone(), - }, - ), - }), - } - } else { - tracing::error!(target: "network", "Peer not found {}", peer_id); - RoutingTableMessagesResponse::Empty - } - } - crate::types::RoutingState::RequestMissingEdges(requested_edges) => { - let seed = ibf_msg.seed; - let (edges_for_peer, _) = - self.peer_ibf_set.split_edges_for_peer(&peer_id, &requested_edges); - - let edges_for_peer = edges_for_peer - .iter() - .filter_map(|x| self.edges_info.get(x.key()).cloned()) - .collect(); - - let ibf_msg = crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed, - edges: edges_for_peer, - routing_state: crate::types::RoutingState::Done, - }; - RoutingTableMessagesResponse::ProcessIbfMessageResponse { - ibf_msg: Some(ibf_msg), - } - } - crate::types::RoutingState::RequestAllEdges => { - RoutingTableMessagesResponse::ProcessIbfMessageResponse { - ibf_msg: Some(crate::types::RoutingVersion2 { - known_edges: self.edges_info.len() as u64, - seed: ibf_msg.seed, - edges: self.get_all_edges(), - routing_state: crate::types::RoutingState::Done, - }), - } - } - crate::types::RoutingState::Done => { - RoutingTableMessagesResponse::ProcessIbfMessageResponse { ibf_msg: None } - } - } - } } } } diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 4432b7954c1..96f109fa161 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -457,7 +457,6 @@ pub struct SetAdvOptions { pub set_max_peers: Option, } -#[cfg(feature = "test_features")] #[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] #[derive(Message, Clone, Debug)] #[rtype(result = "()")] diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 6c61cc4acdc..e865fd285cc 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -2,7 +2,6 @@ pub use crate::network_protocol::{ Encoding, Handshake, HandshakeFailureReason, PeerMessage, RoutingTableUpdate, }; -pub use crate::network_protocol::{PartialSync, RoutingState, RoutingSyncV2, RoutingVersion2}; use crate::private_actix::{ PeerRequestResult, PeersRequest, RegisterPeer, RegisterPeerResponse, Unregister, }; @@ -118,13 +117,9 @@ pub enum PeerManagerMessageRequest { InboundTcpConnect(InboundTcpConnect), Unregister(Unregister), Ban(Ban), - #[cfg(feature = "test_features")] - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - StartRoutingTableSync(crate::private_actix::StartRoutingTableSync), - #[cfg(feature = "test_features")] + /// TEST-ONLY SetAdvOptions(crate::test_utils::SetAdvOptions), - #[cfg(feature = "test_features")] - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] + /// TEST-ONLY allows for modifying the internal routing table. SetRoutingTable(crate::test_utils::SetRoutingTable), } @@ -161,13 +156,9 @@ pub enum PeerManagerMessageResponse { InboundTcpConnect(()), Unregister(()), Ban(()), - #[cfg(feature = "test_features")] - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - StartRoutingTableSync(()), - #[cfg(feature = "test_features")] + /// TEST-ONLY SetAdvOptions(()), - #[cfg(feature = "test_features")] - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] + /// TEST-ONLY SetRoutingTable(()), } @@ -342,12 +333,6 @@ pub enum NetworkRequests { /// A challenge to invalidate a block. Challenge(Challenge), - - // IbfMessage - IbfMessage { - peer_id: PeerId, - ibf_msg: RoutingSyncV2, - }, } /// Combines peer address info, chain and edge information. From 5004e316134c43da68cf925ee4a740fe7014a7c3 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Mon, 13 Jun 2022 19:58:58 +0200 Subject: [PATCH 4/4] Trace code execution across actix queues for PeerManagerActor->PeerActor messages (#7005) When an actix actor handles a message from its queue, it does that outside of any spans. This PR attaches the context of the requester to the request. Thus making the request handling seem like a direct child of spans of the requester. In the future we'd like to trace execution across process boundaries. For example, to see a partial encoded chunk request made by process A, its handling by process B, and receiving a response by process A again. However, to make this possible we first need to trace execution within the process across the actix system. --- Cargo.lock | 3 + chain/network-primitives/Cargo.toml | 1 + chain/network-primitives/src/types.rs | 8 + chain/network/Cargo.toml | 2 + chain/network/src/peer/peer_actor.rs | 33 +++-- .../src/peer_manager/peer_manager_actor.rs | 137 +++++++++++++----- chain/network/src/private_actix.rs | 1 + chain/network/src/tests/peer_actor.rs | 8 +- chain/network/src/types.rs | 4 +- core/o11y/src/lib.rs | 30 +++- 10 files changed, 173 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c2c64c4ee2..5fbac9a9b85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3080,6 +3080,7 @@ dependencies = [ "near-stable-hasher", "near-store", "once_cell", + "opentelemetry", "parking_lot 0.12.1", "protobuf 3.0.2", "protobuf-codegen", @@ -3093,6 +3094,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "tracing-opentelemetry", ] [[package]] @@ -3107,6 +3109,7 @@ dependencies = [ "near-crypto", "near-primitives", "once_cell", + "opentelemetry", "serde", "strum", "time 0.3.9", diff --git a/chain/network-primitives/Cargo.toml b/chain/network-primitives/Cargo.toml index e5fe2c749db..4b119df489b 100644 --- a/chain/network-primitives/Cargo.toml +++ b/chain/network-primitives/Cargo.toml @@ -17,6 +17,7 @@ borsh = "0.9" once_cell = "1.12.0" chrono = { version = "0.4.4", features = ["serde"] } deepsize = { version = "0.2.0", optional = true } +opentelemetry = { version = "0.17", features = ["trace"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } strum = { version = "0.24", features = ["derive"] } time = "0.3.9" diff --git a/chain/network-primitives/src/types.rs b/chain/network-primitives/src/types.rs index 4262d911353..edb4b2a286d 100644 --- a/chain/network-primitives/src/types.rs +++ b/chain/network-primitives/src/types.rs @@ -246,6 +246,14 @@ pub enum PeerManagerRequest { UnregisterPeer, } +/// Messages from PeerManager to Peer with a tracing Context. +#[derive(Message, Debug)] +#[rtype(result = "()")] +pub struct PeerManagerRequestWithContext { + pub msg: PeerManagerRequest, + pub context: opentelemetry::Context, +} + #[derive(Debug, Clone)] pub struct KnownProducer { pub account_id: AccountId, diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index 0631906a672..3e3119f8c41 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -29,6 +29,7 @@ lru = "0.7.2" thiserror = "1" near-rust-allocator-proxy = { version = "0.4", optional = true } once_cell = "1.12.0" +opentelemetry = { version = "0.17", features = ["trace"] } rand = "0.6" rand_pcg = "0.1" serde = { version = "1", features = ["alloc", "derive", "rc"], optional = true } @@ -37,6 +38,7 @@ tokio-stream = { version = "0.1.2", features = ["net"] } tokio-util = { version = "0.7.1", features = ["codec"] } tokio = { version = "1.1", features = ["net", "rt-multi-thread"] } tracing = "0.1.13" +tracing-opentelemetry = { version = "0.17" } assert_matches = "1.3" delay-detector = { path = "../../tools/delay_detector" } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index ee6e56b5ef9..adcbb041d48 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -18,9 +18,11 @@ use lru::LruCache; use near_crypto::Signature; use near_network_primitives::types::{ Ban, NetworkViewClientMessages, NetworkViewClientResponses, PeerChainInfoV2, PeerIdOrHash, - PeerInfo, PeerManagerRequest, PeerType, ReasonForBan, RoutedMessage, RoutedMessageBody, - RoutedMessageFrom, StateResponseInfo, UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE, + PeerInfo, PeerManagerRequest, PeerManagerRequestWithContext, PeerType, ReasonForBan, + RoutedMessage, RoutedMessageBody, RoutedMessageFrom, StateResponseInfo, + UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE, }; +use tracing_opentelemetry::OpenTelemetrySpanExt; use near_network_primitives::types::{Edge, PartialEdgeInfo}; use near_performance_metrics::framed_write::{FramedWrite, WriteHandler}; @@ -1051,7 +1053,9 @@ impl Handler for PeerActor { #[perf] fn handle(&mut self, msg: SendMessage, _: &mut Self::Context) { - trace!(target: "network", "SendMessage"); + let span = + tracing::trace_span!(target: "network", "handle", handler="SendMessage").entered(); + span.set_parent(msg.context); let _d = delay_detector::DelayDetector::new(|| "send message".into()); self.send_message_or_log(&msg.message); } @@ -1062,7 +1066,9 @@ impl Handler> for PeerActor { #[perf] fn handle(&mut self, msg: Arc, _: &mut Self::Context) { - trace!(target: "network", "SendMessage"); + let span = + tracing::trace_span!(target: "network", "handle", handler="SendMessage").entered(); + span.set_parent(msg.context.clone()); let _d = delay_detector::DelayDetector::new(|| "send message".into()); self.send_message_or_log(&msg.as_ref().message); } @@ -1072,8 +1078,10 @@ impl Handler for PeerActor { type Result = PeerStatsResult; #[perf] - fn handle(&mut self, _msg: QueryPeerStats, _: &mut Self::Context) -> Self::Result { - trace!(target: "network", "QueryPeerStats"); + fn handle(&mut self, msg: QueryPeerStats, _: &mut Self::Context) -> Self::Result { + let span = + tracing::trace_span!(target: "network", "handle", handler="QueryPeerStats").entered(); + span.set_parent(msg.context); let _d = delay_detector::DelayDetector::new(|| "query peer stats".into()); // TODO(#5218) Refactor this code to use `SystemTime` @@ -1098,12 +1106,19 @@ impl Handler for PeerActor { } } -impl Handler for PeerActor { +impl Handler for PeerActor { type Result = (); #[perf] - fn handle(&mut self, msg: PeerManagerRequest, ctx: &mut Self::Context) -> Self::Result { - trace!(target: "network", "PeerManagerRequest"); + fn handle( + &mut self, + msg: PeerManagerRequestWithContext, + ctx: &mut Self::Context, + ) -> Self::Result { + let span = tracing::trace_span!(target: "network", "handle", handler="PeerManagerRequest") + .entered(); + span.set_parent(msg.context); + let msg = msg.msg; let _d = delay_detector::DelayDetector::new(|| format!("peer manager request {:?}", msg).into()); match msg { diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index d3071bded72..1d43020d132 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -27,8 +27,9 @@ use anyhow::bail; use near_network_primitives::types::{ AccountOrPeerIdOrHash, Ban, Edge, InboundTcpConnect, KnownPeerStatus, KnownProducer, NetworkConfig, NetworkViewClientMessages, NetworkViewClientResponses, OutboundTcpConnect, - PeerIdOrHash, PeerInfo, PeerManagerRequest, PeerType, Ping, Pong, RawRoutedMessage, - ReasonForBan, RoutedMessage, RoutedMessageBody, RoutedMessageFrom, StateResponseInfo, + PeerIdOrHash, PeerInfo, PeerManagerRequest, PeerManagerRequestWithContext, PeerType, Ping, + Pong, RawRoutedMessage, ReasonForBan, RoutedMessage, RoutedMessageBody, RoutedMessageFrom, + StateResponseInfo, }; use near_network_primitives::types::{EdgeState, PartialEdgeInfo}; use near_performance_metrics::framed_write::FramedWrite; @@ -53,7 +54,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; /// How often to request peers from active peers. const REQUEST_PEERS_INTERVAL: Duration = Duration::from_millis(60_000); @@ -282,7 +284,8 @@ impl Actor for PeerManagerActor { /// Try to gracefully disconnect from connected peers. fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { - let msg = SendMessage { message: PeerMessage::Disconnect }; + let msg = + SendMessage { message: PeerMessage::Disconnect, context: Span::current().context() }; for connected_peer in self.connected_peers.values() { connected_peer.addr.do_send(msg.clone()); @@ -426,6 +429,7 @@ impl PeerManagerActor { &self.connected_peers, SendMessage { message: PeerMessage::SyncRoutingTable(RoutingTableUpdate::from_accounts(accounts)), + context: Span::current().context(), }, ) } @@ -486,6 +490,8 @@ impl PeerManagerActor { /// Receives list of edges that were verified, in a trigger every 20ms, and adds them to /// the routing table. fn broadcast_validated_edges_trigger(&mut self, ctx: &mut Context, interval: Duration) { + let span = + tracing::trace_span!(target: "network", "broadcast_validated_edges_trigger").entered(); let start = Clock::instant(); let mut new_edges = Vec::new(); while let Some(edge) = self.routing_table_exchange_helper.edges_to_add_receiver.pop() { @@ -528,21 +534,32 @@ impl PeerManagerActor { self.routing_table_addr .send(RoutingTableMessages::AddVerifiedEdges { edges: new_edges }) .into_actor(self) - .map(move |response, act, _ctx| match response { - Ok(RoutingTableMessagesResponse::AddVerifiedEdgesResponse(filtered_edges)) => { - // Broadcast new edges to all other peers. - if act.adv_helper.can_broadcast_edges() { - let sync_routing_table = RoutingTableUpdate::from_edges(filtered_edges); - Self::broadcast_message( - network_metrics, - &act.connected_peers, - SendMessage { - message: PeerMessage::SyncRoutingTable(sync_routing_table), - }, - ) + .map(move |response, act, _ctx| { + let _span = tracing::trace_span!( + target: "network", + parent: &span, + "broadcast_validated_edges_trigger_response") + .entered(); + match response { + Ok(RoutingTableMessagesResponse::AddVerifiedEdgesResponse( + filtered_edges, + )) => { + // Broadcast new edges to all other peers. + if act.adv_helper.can_broadcast_edges() { + let sync_routing_table = + RoutingTableUpdate::from_edges(filtered_edges); + Self::broadcast_message( + network_metrics, + &act.connected_peers, + SendMessage { + message: PeerMessage::SyncRoutingTable(sync_routing_table), + context: Span::current().context(), + }, + ) + } } + _ => error!(target: "network", "expected AddIbfSetResponse"), } - _ => error!(target: "network", "expected AddIbfSetResponse"), }) .spawn(ctx); }; @@ -569,6 +586,7 @@ impl PeerManagerActor { throttle_controller: ThrottleController, ctx: &mut Context, ) { + let span = tracing::trace_span!(target: "network", "register_peer").entered(); debug!(target: "network", ?full_peer_info, "Consolidated connection"); if self.outgoing_peers.contains(&full_peer_info.peer_info.id) { @@ -615,21 +633,26 @@ impl PeerManagerActor { Some(throttle_controller), )) .into_actor(act) - .map(move |response, _act, ctx| match response.map(|r| r.into_inner()) { - Ok(RoutingTableMessagesResponse::RequestRoutingTableResponse { - edges_info: routing_table, - }) => { - Self::send_sync( - network_metrics, - peer_type, - addr, - ctx, - target_peer_id.clone(), - new_edge, - routing_table, - ); + .map(move |response, _act, ctx| { + let _span = + tracing::trace_span!(target: "network", parent: &span, "RequestRoutingTableResponse") + .entered(); + match response.map(|r| r.into_inner()) { + Ok(RoutingTableMessagesResponse::RequestRoutingTableResponse { + edges_info: routing_table, + }) => { + Self::send_sync( + network_metrics, + peer_type, + addr, + ctx, + target_peer_id.clone(), + new_edge, + routing_table, + ); + } + _ => error!(target: "network", "expected AddIbfSetResponse"), } - _ => error!(target: "network", "expected AddIbfSetResponse"), }) .spawn(ctx); }); @@ -644,7 +667,10 @@ impl PeerManagerActor { new_edge: Edge, known_edges: Vec, ) { + let span = tracing::trace_span!(target: "network", "send_sync").entered(); near_performance_metrics::actix::run_later(ctx, WAIT_FOR_SYNC_DELAY, move |act, _ctx| { + let _span = tracing::trace_span!(target: "network", parent: &span, "send_sync_attempt") + .entered(); // Start syncing network point of view. Wait until both parties are connected before start // sending messages. let known_accounts = act.routing_table_view.get_announce_accounts(); @@ -653,10 +679,14 @@ impl PeerManagerActor { known_edges, known_accounts.cloned().collect(), )), + context: Span::current().context(), }); // Ask for peers list on connection. - addr.do_send(SendMessage { message: PeerMessage::PeersRequest }); + addr.do_send(SendMessage { + message: PeerMessage::PeersRequest, + context: Span::current().context(), + }); if let Some(connected_peer) = act.connected_peers.get_mut(&target_peer_id) { connected_peer.last_time_peer_requested = Clock::instant(); } @@ -671,6 +701,7 @@ impl PeerManagerActor { message: PeerMessage::SyncRoutingTable(RoutingTableUpdate::from_edges( vec![new_edge], )), + context: Span::current().context(), }, ); } @@ -705,6 +736,7 @@ impl PeerManagerActor { message: PeerMessage::SyncRoutingTable(RoutingTableUpdate::from_edges( vec![edge_update], )), + context: Span::current().context(), }, ); } @@ -749,7 +781,10 @@ impl PeerManagerActor { /// and then mark peer as banned in the peer store. pub(crate) fn try_ban_peer(&mut self, peer_id: &PeerId, ban_reason: ReasonForBan) { if let Some(peer) = self.connected_peers.get(peer_id) { - peer.addr.do_send(PeerManagerRequest::BanPeer(ban_reason)); + peer.addr.do_send(PeerManagerRequestWithContext { + msg: PeerManagerRequest::BanPeer(ban_reason), + context: Span::current().context(), + }); } else { warn!(target: "network", ?ban_reason, ?peer_id, "Try to ban a disconnected peer for"); // Call `ban_peer` in peer manager to trigger action that persists information @@ -904,7 +939,8 @@ impl PeerManagerActor { /// Query current peers for more peers. fn query_connected_peers_for_more_peers(&mut self) { let mut requests = futures::stream::FuturesUnordered::new(); - let msg = SendMessage { message: PeerMessage::PeersRequest }; + let msg = + SendMessage { message: PeerMessage::PeersRequest, context: Span::current().context() }; for connected_peer in self.connected_peers.values_mut() { if connected_peer.last_time_peer_requested.elapsed() > REQUEST_PEERS_INTERVAL { connected_peer.last_time_peer_requested = Clock::instant(); @@ -962,6 +998,7 @@ impl PeerManagerActor { message: PeerMessage::SyncRoutingTable(RoutingTableUpdate::from_edges( vec![new_edge], )), + context: Span::current().context(), }, ); } @@ -1010,7 +1047,10 @@ impl PeerManagerActor { if *cur_nonce == nonce { if let Some(peer) = act.connected_peers.get(&other) { // Send disconnect signal to this peer if we haven't edge update. - peer.addr.do_send(PeerManagerRequest::UnregisterPeer); + peer.addr.do_send(PeerManagerRequestWithContext { + msg: PeerManagerRequest::UnregisterPeer, + context: Span::current().context(), + }); } act.local_peer_pending_update_nonce_request.remove(&other); } @@ -1033,7 +1073,7 @@ impl PeerManagerActor { for (peer_id, connected_peer) in self.connected_peers.iter() { let peer_id1 = peer_id.clone(); - (connected_peer.addr.send(QueryPeerStats {}).into_actor(self)) + (connected_peer.addr.send(QueryPeerStats {context: Span::current().context()}).into_actor(self)) .map(move |res, act, _| { match res { Ok(res) => { @@ -1142,7 +1182,10 @@ impl PeerManagerActor { let candidates = self.connected_peers.iter().filter(|(id, _)| !safe_set.contains(id)); if let Some((id, p)) = candidates.choose(&mut rand::thread_rng()) { debug!(target: "network", ?id, "Stop active connection"); - p.addr.do_send(PeerManagerRequest::UnregisterPeer); + p.addr.do_send(PeerManagerRequestWithContext { + msg: PeerManagerRequest::UnregisterPeer, + context: Span::current().context(), + }); } } @@ -1164,6 +1207,7 @@ impl PeerManagerActor { mut interval: Duration, (default_interval, max_interval): (Duration, Duration), ) { + let _span = tracing::trace_span!(target: "network", "monitor_peers_trigger").entered(); let mut to_unban = vec![]; for (peer_id, peer_state) in self.peer_store.iter() { if let KnownPeerStatus::Banned(_, last_banned) = peer_state.status { @@ -1285,6 +1329,7 @@ impl PeerManagerActor { message: PeerMessage::SyncRoutingTable(RoutingTableUpdate::from_accounts( vec![announce_account], )), + context: Span::current().context(), }, ); } @@ -1300,7 +1345,9 @@ impl PeerManagerActor { if let Some(connected_peer) = connected_peers.get(&peer_id) { let msg_kind = message.msg_variant().to_string(); trace!(target: "network", ?msg_kind, "Send message"); - connected_peer.addr.do_send(SendMessage { message }); + connected_peer + .addr + .do_send(SendMessage { message, context: Span::current().context() }); true } else { debug!(target: "network", @@ -1483,6 +1530,7 @@ impl PeerManagerActor { ctx: &mut Context, throttle_controller: Option, ) -> NetworkResponses { + let span = tracing::trace_span!(target: "network", "handle_msg_network_requests").entered(); let _d = delay_detector::DelayDetector::new(|| { format!("network request {}", msg.as_ref()).into() }); @@ -1492,7 +1540,10 @@ impl PeerManagerActor { Self::broadcast_message( self.network_metrics.clone(), &self.connected_peers, - SendMessage { message: PeerMessage::Block(block) }, + SendMessage { + message: PeerMessage::Block(block), + context: Span::current().context(), + }, ); NetworkResponses::NoResponse } @@ -1743,6 +1794,7 @@ impl PeerManagerActor { .send(NetworkViewClientMessages::AnnounceAccount(accounts)) .into_actor(self) .then(move |response, act, _ctx| { + let _span = tracing::trace_span!(target: "network", parent: &span, "announce_account").entered(); match response { Ok(NetworkViewClientResponses::Ban { ban_reason }) => { act.try_ban_peer(&peer_id_clone, ban_reason); @@ -1766,7 +1818,10 @@ impl PeerManagerActor { Self::broadcast_message( self.network_metrics.clone(), &self.connected_peers, - SendMessage { message: PeerMessage::Challenge(challenge) }, + SendMessage { + message: PeerMessage::Challenge(challenge), + context: Span::current().context(), + }, ); NetworkResponses::NoResponse } @@ -2069,6 +2124,8 @@ impl PeerManagerActor { ctx: &mut Context, throttle_controller: Option, ) -> PeerManagerMessageResponse { + let _span = + tracing::trace_span!(target: "network", "handle_peer_manager_message").entered(); match msg { PeerManagerMessageRequest::RoutedMessageFrom(msg) => { PeerManagerMessageResponse::RoutedMessageFrom(self.handle_msg_routed_from(msg)) diff --git a/chain/network/src/private_actix.rs b/chain/network/src/private_actix.rs index 4b6433bed3b..b444ec08c7c 100644 --- a/chain/network/src/private_actix.rs +++ b/chain/network/src/private_actix.rs @@ -100,6 +100,7 @@ pub struct GetPeerId {} #[rtype(result = "()")] pub struct SendMessage { pub(crate) message: PeerMessage, + pub(crate) context: opentelemetry::Context, } #[cfg(feature = "test_features")] diff --git a/chain/network/src/tests/peer_actor.rs b/chain/network/src/tests/peer_actor.rs index a4d21df8834..5c098c0280d 100644 --- a/chain/network/src/tests/peer_actor.rs +++ b/chain/network/src/tests/peer_actor.rs @@ -32,6 +32,8 @@ use std::sync::Arc; use std::time; use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; pub struct PeerConfig { pub signer: InMemorySigner, @@ -239,7 +241,11 @@ impl PeerHandle { } pub async fn send(&self, message: PeerMessage) { - self.actix.addr.send(SendMessage { message }).await.unwrap(); + self.actix + .addr + .send(SendMessage { message, context: Span::current().context() }) + .await + .unwrap(); } pub fn routed_message(&self, body: RoutedMessageBody, peer_id: PeerId) -> Box { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index e865fd285cc..98e5496a595 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -30,7 +30,9 @@ use std::fmt::Debug; /// Peer stats query. #[derive(actix::Message)] #[rtype(result = "PeerStatsResult")] -pub struct QueryPeerStats {} +pub struct QueryPeerStats { + pub(crate) context: opentelemetry::Context, +} /// Peer stats result #[derive(Debug, actix::MessageResponse)] diff --git a/core/o11y/src/lib.rs b/core/o11y/src/lib.rs index c45833ae430..a26dc74497b 100644 --- a/core/o11y/src/lib.rs +++ b/core/o11y/src/lib.rs @@ -58,13 +58,28 @@ pub struct DefaultSubscriberGuard { writer_guard: tracing_appender::non_blocking::WorkerGuard, } +// Doesn't define WARN and ERROR, because the highest verbosity of spans is INFO. +#[derive(Copy, Clone, Debug, clap::ArgEnum)] +pub enum OpenTelemetryLevel { + OFF, + INFO, + DEBUG, + TRACE, +} + +impl Default for OpenTelemetryLevel { + fn default() -> Self { + OpenTelemetryLevel::OFF + } +} + /// Configures exporter of span and trace data. // Currently empty, but more fields will be added in the future. #[derive(Debug, Default, Parser)] pub struct Options { /// Enables export of span data using opentelemetry exporters. - #[clap(long)] - opentelemetry: bool, + #[clap(long, arg_enum, default_value = "off")] + opentelemetry: OpenTelemetryLevel, /// Whether the log needs to be colored. #[clap(long, arg_enum, default_value = "auto")] @@ -162,11 +177,20 @@ where ) .install_batch(opentelemetry::runtime::Tokio) .unwrap(); - let filter = if config.opentelemetry { LevelFilter::DEBUG } else { LevelFilter::OFF }; + let filter = get_opentelemetry_filter(config); let layer = tracing_opentelemetry::layer().with_tracer(tracer).with_filter(filter); layer } +fn get_opentelemetry_filter(config: &Options) -> LevelFilter { + match config.opentelemetry { + OpenTelemetryLevel::OFF => LevelFilter::OFF, + OpenTelemetryLevel::INFO => LevelFilter::INFO, + OpenTelemetryLevel::DEBUG => LevelFilter::DEBUG, + OpenTelemetryLevel::TRACE => LevelFilter::TRACE, + } +} + /// Run the code with a default subscriber set to the option appropriate for the NEAR code. /// /// This will override any subscribers set until now, and will be in effect until the value