diff --git a/Cargo.lock b/Cargo.lock index 42618b10465..3ac344c50ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,9 +2106,9 @@ dependencies = [ [[package]] name = "linked-hash-map" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83" +checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" [[package]] name = "loadtester" @@ -2420,6 +2420,7 @@ dependencies = [ "delay-detector", "futures", "lazy_static", + "linked-hash-map", "log", "near-chain", "near-chain-configs", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 21ee0a2abc4..edad1ebe7e6 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -15,8 +15,7 @@ use crate::lightclient::get_epoch_block_producers_view; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; use crate::types::{ AcceptedBlock, ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader, - BlockHeaderInfo, BlockStatus, BlockSyncResponse, ChainGenesis, Provenance, ReceiptList, - RuntimeAdapter, + BlockHeaderInfo, BlockStatus, ChainGenesis, Provenance, ReceiptList, RuntimeAdapter, }; use crate::validate::{ validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra, @@ -827,57 +826,8 @@ impl Chain { chain_update.commit() } - /// Check if state download is required, otherwise return hashes of blocks to fetch. - /// Hashes are sorted increasingly by height. - pub fn check_state_needed( - &mut self, - block_fetch_horizon: BlockHeightDelta, - force_block_sync: bool, - ) -> Result { - let block_head = self.head()?; - let header_head = self.header_head()?; - let mut hashes = vec![]; - - // If latest block is up to date return early. - // No state download is required, neither any blocks need to be fetched. - if block_head.height >= header_head.height { - return Ok(BlockSyncResponse::None); - } - - let next_epoch_id = - self.get_block_header(&block_head.last_block_hash)?.next_epoch_id().clone(); - - // Don't run State Sync if header head is not more than one epoch ahead. - if block_head.epoch_id != header_head.epoch_id && next_epoch_id != header_head.epoch_id { - if block_head.height < header_head.height.saturating_sub(block_fetch_horizon) - && !force_block_sync - { - // Epochs are different and we are too far from horizon, State Sync is needed - return Ok(BlockSyncResponse::StateNeeded); - } - } - - // Find hashes of blocks to sync - let mut current = self.get_block_header(&header_head.last_block_hash).map(|h| h.clone()); - while let Ok(header) = current { - if header.height() <= block_head.height { - if self.is_on_current_chain(&header).is_ok() { - break; - } - } - - hashes.push(*header.hash()); - current = self.get_previous_header(&header).map(|h| h.clone()); - } - - // Sort hashes by height - hashes.reverse(); - - Ok(BlockSyncResponse::BlocksNeeded(hashes)) - } - /// Returns if given block header is on the current chain. - fn is_on_current_chain(&mut self, header: &BlockHeader) -> Result<(), Error> { + pub fn is_on_current_chain(&mut self, header: &BlockHeader) -> Result<(), Error> { let chain_header = self.get_header_by_height(header.height())?; if chain_header.hash() == header.hash() { Ok(()) diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 0a1c51935f2..c15f4698569 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -584,17 +584,6 @@ pub struct LatestKnown { pub seen: u64, } -/// When running block sync response to know if the node needs to sync state, -/// or the hashes from the blocks that are needed. -pub enum BlockSyncResponse { - /// State is needed before we start fetching recent blocks. - StateNeeded, - /// We are up to date with state, list of block hashes that need to be fetched. - BlocksNeeded(Vec), - /// We are up to date, nothing is required. - None, -} - #[cfg(test)] mod tests { use chrono::Utc; diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index c23a0f6d845..b9cc45ea85a 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -22,6 +22,7 @@ lazy_static = "1.4" borsh = "0.7.1" reed-solomon-erasure = "4" num-rational = "0.2.4" +linked-hash-map = "0.5.3" near-crypto = { path = "../../core/crypto" } near-primitives = { path = "../../core/primitives" } diff --git a/chain/client/src/sync.rs b/chain/client/src/sync.rs index 83f2eedbf0f..eab6a4c0df4 100644 --- a/chain/client/src/sync.rs +++ b/chain/client/src/sync.rs @@ -8,11 +8,11 @@ use std::{ops::Add, time::Duration as TimeDuration}; use ansi_term::Color::{Purple, Yellow}; use chrono::{DateTime, Duration, Utc}; use futures::{future, FutureExt}; +use linked_hash_map::LinkedHashMap; use log::{debug, error, info, warn}; use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng}; -use near_chain::types::BlockSyncResponse; use near_chain::{Chain, RuntimeAdapter}; use near_network::types::{AccountOrPeerIdOrHash, NetworkResponses, ReasonForBan}; use near_network::{FullPeerInfo, NetworkAdapter, NetworkRequests}; @@ -24,6 +24,7 @@ use near_primitives::utils::to_timestamp; use crate::types::{DownloadStatus, ShardSyncDownload, ShardSyncStatus, SyncStatus}; use cached::{Cached, SizedCache}; +use near_primitives::block_header::BlockHeader; /// Maximum number of block headers send over the network. pub const MAX_BLOCK_HEADERS: u64 = 512; @@ -38,9 +39,13 @@ const MAX_BLOCK_REQUEST: usize = 100; const MAX_PEER_BLOCK_REQUEST: usize = 10; const BLOCK_REQUEST_TIMEOUT: i64 = 6; -const BLOCK_SOME_RECEIVED_TIMEOUT: i64 = 1; +const BLOCK_SOME_RECEIVED_TIMEOUT: i64 = 2; const BLOCK_REQUEST_BROADCAST_OFFSET: u64 = 2; +/// If the number of hashes in the cache is below this number, we should +/// refill the cache to request more blocks. +const BLOCK_SYNC_CACHE_LOWER_LIMIT: usize = 10; + /// Sync state download timeout in seconds. pub const STATE_SYNC_TIMEOUT: i64 = 10; /// Maximum number of state parts to request per peer on each round when node is trying to download the state. @@ -331,6 +336,36 @@ fn get_locator_heights(height: u64) -> Vec { heights } +/// Cache for block sync that stores the hashes to request in insertion order. +/// It also maintains the last final block header to minimize the impact of reorgs. +#[derive(Default)] +struct BlockSyncCache { + hashes: LinkedHashMap, + last_header: Option, +} + +impl BlockSyncCache { + fn len(&self) -> usize { + self.hashes.len() + } + + fn insert(&mut self, block_hash: CryptoHash) { + self.hashes.insert(block_hash, ()); + } + + fn remove(&mut self, block_hash: &CryptoHash) { + self.hashes.remove(block_hash); + if self.hashes.is_empty() { + self.last_header = None; + } + } + + fn clear(&mut self) { + self.hashes.clear(); + self.last_header = None; + } +} + /// Helper to track block syncing. pub struct BlockSync { network_adapter: Arc, @@ -341,6 +376,8 @@ pub struct BlockSync { block_fetch_horizon: BlockHeightDelta, /// Whether to enforce block sync archive: bool, + /// Cache that stores hashes to request + cache: BlockSyncCache, } impl BlockSync { @@ -356,6 +393,7 @@ impl BlockSync { prev_blocks_received: 0, block_fetch_horizon, archive, + cache: BlockSyncCache::default(), } } @@ -369,7 +407,7 @@ impl BlockSync { highest_height_peers: &[FullPeerInfo], ) -> Result { if self.block_sync_due(chain)? { - if self.block_sync(chain, highest_height_peers, self.block_fetch_horizon)? { + if self.block_sync(chain, highest_height_peers)? { debug!(target: "sync", "Sync: transition to State Sync."); return Ok(true); } @@ -380,57 +418,150 @@ impl BlockSync { Ok(false) } + /// Check if state download is required + fn check_state_needed(&self, chain: &Chain) -> Result { + let head = chain.head()?; + let header_head = chain.header_head()?; + + // If latest block is up to date return early. + // No state download is required, neither any blocks need to be fetched. + if head.height >= header_head.height { + return Ok(false); + } + + // Don't run State Sync if header head is not more than one epoch ahead. + if head.epoch_id != header_head.epoch_id && head.next_epoch_id != header_head.epoch_id { + if head.height < header_head.height.saturating_sub(self.block_fetch_horizon) + && !self.archive + { + // Epochs are different and we are too far from horizon, State Sync is needed + return Ok(true); + } + } + + Ok(false) + } + + fn compute_hashes_to_request( + &mut self, + new_hashes: &[CryptoHash], + chain: &mut Chain, + block_count: usize, + ) -> Vec { + let mut res = vec![]; + let mut hashes_to_remove = vec![]; + for (block_hash, _) in self.cache.hashes.iter() { + if res.len() >= block_count { + break; + } + let block_exists = chain.block_exists(block_hash).unwrap_or(false); + // Only remove hash if the block is accepted. Otherwise keep them in the cache in case + // they get evicted from orphan pool. + if block_exists { + hashes_to_remove.push(*block_hash); + } + if block_exists || chain.is_orphan(block_hash) || chain.is_chunk_orphan(block_hash) { + continue; + } else { + res.push(*block_hash); + } + } + for hash in hashes_to_remove { + self.cache.remove(&hash); + } + for hash in new_hashes.iter().rev().take(block_count.saturating_sub(res.len())) { + res.push(*hash); + } + res + } + + /// Fetch the block hashes that we need to request for block sync. + fn fetch_block_hashes_to_request( + &mut self, + chain: &mut Chain, + block_count: usize, + ) -> Result, near_chain::Error> { + let last_header = match self.cache.last_header { + Some(ref h) => h.clone(), + None => chain.head_header()?.clone(), + }; + let head = chain.head()?; + // if cache contains outdated information, reset it. + if last_header.height() <= head.height { + self.cache.clear(); + } + let header_head = chain.header_head()?; + if header_head.height <= head.height { + return Ok(vec![]); + } + let close_to_header_head = + last_header.height() > header_head.height.saturating_sub(self.block_fetch_horizon); + if self.cache.len() < BLOCK_SYNC_CACHE_LOWER_LIMIT || close_to_header_head { + self.cache.clear(); + let mut hashes_to_request = vec![]; + + // Find hashes of blocks to sync + let mut current = + chain.get_block_header(&header_head.last_block_hash).map(|h| h.clone()); + while let Ok(header) = current { + if header.height() <= head.height { + if chain.is_on_current_chain(&header).is_ok() { + break; + } + } + + hashes_to_request.push(*header.hash()); + current = chain.get_previous_header(&header).map(|h| h.clone()); + } + let res = self.compute_hashes_to_request(&hashes_to_request, chain, block_count); + + self.cache.hashes.reserve(hashes_to_request.len()); + for hash in hashes_to_request.into_iter().rev() { + self.cache.insert(hash); + } + self.cache.last_header = + chain.get_block_header(&header_head.last_block_hash).map(|h| h.clone()).ok(); + Ok(res) + } else { + Ok(self.compute_hashes_to_request(&[], chain, block_count)) + } + } + /// Returns true if state download is required (last known block is too far). /// Otherwise request recent blocks from peers round robin. pub fn block_sync( &mut self, chain: &mut Chain, highest_height_peers: &[FullPeerInfo], - block_fetch_horizon: BlockHeightDelta, ) -> Result { - match chain.check_state_needed(block_fetch_horizon, self.archive)? { - BlockSyncResponse::StateNeeded => { - return Ok(true); - } - BlockSyncResponse::BlocksNeeded(hashes) => { - // Ask for `num_peers * MAX_PEER_BLOCK_REQUEST` blocks up to 100, throttle if there is too many orphans in the chain. - let block_count = min( - min(MAX_BLOCK_REQUEST, MAX_PEER_BLOCK_REQUEST * highest_height_peers.len()), - near_chain::MAX_ORPHAN_SIZE.saturating_sub(chain.orphans_len()) + 1, - ); + if self.check_state_needed(chain)? { + return Ok(true); + } + // Ask for `num_peers * MAX_PEER_BLOCK_REQUEST` blocks up to 100, throttle if there is too many orphans in the chain. + let block_count = min( + min(MAX_BLOCK_REQUEST, MAX_PEER_BLOCK_REQUEST * highest_height_peers.len()), + near_chain::MAX_ORPHAN_SIZE.saturating_sub(chain.orphans_len()) + 1, + ); + let hashes_to_request = self.fetch_block_hashes_to_request(chain, block_count)?; + if !hashes_to_request.is_empty() { + let head = chain.head()?; + let header_head = chain.header_head()?; - let hashes_to_request = hashes - .iter() - .filter(|x| { - !chain.get_block(x).is_ok() - && !chain.is_orphan(x) - && !chain.is_chunk_orphan(x) - }) - .take(block_count) - .collect::>(); - - if hashes_to_request.len() > 0 { - let head = chain.head()?; - let header_head = chain.header_head()?; - - debug!(target: "sync", "Block sync: {}/{} requesting blocks {:?} from {} peers", head.height, header_head.height, hashes_to_request, highest_height_peers.len()); - - self.blocks_requested = 0; - self.receive_timeout = Utc::now() + Duration::seconds(BLOCK_REQUEST_TIMEOUT); - - let mut peers_iter = highest_height_peers.iter().cycle(); - for hash in hashes_to_request.into_iter() { - if let Some(peer) = peers_iter.next() { - self.network_adapter.do_send(NetworkRequests::BlockRequest { - hash: hash.clone(), - peer_id: peer.peer_info.id.clone(), - }); - self.blocks_requested += 1; - } - } + debug!(target: "sync", "Block sync: {}/{} requesting blocks {:?} from {} peers", head.height, header_head.height, hashes_to_request, highest_height_peers.len()); + + self.blocks_requested = 0; + self.receive_timeout = Utc::now() + Duration::seconds(BLOCK_REQUEST_TIMEOUT); + + let mut peers_iter = highest_height_peers.iter().cycle(); + for hash in hashes_to_request { + if let Some(peer) = peers_iter.next() { + self.network_adapter.do_send(NetworkRequests::BlockRequest { + hash: hash.clone(), + peer_id: peer.peer_info.id.clone(), + }); + self.blocks_requested += 1; } } - BlockSyncResponse::None => {} } Ok(false) @@ -1047,7 +1178,7 @@ mod test { use std::thread; use near_chain::test_utils::{setup, setup_with_validators}; - use near_chain::Provenance; + use near_chain::{ChainGenesis, ChainStoreAccess, ChainStoreUpdate, ErrorKind, Provenance}; use near_crypto::{KeyType, PublicKey}; use near_network::routing::EdgeInfo; use near_network::test_utils::MockNetworkAdapter; @@ -1057,11 +1188,15 @@ mod test { use near_primitives::network::PeerId; use super::*; + use crate::test_utils::TestEnv; + use near_chain::types::LatestKnown; use near_primitives::merkle::PartialMerkleTree; use near_primitives::types::EpochId; use near_primitives::validator_signer::InMemoryValidatorSigner; use near_primitives::version::PROTOCOL_VERSION; use num_rational::Ratio; + use std::collections::HashSet; + use std::ops::Range; #[test] fn test_get_locator_heights() { @@ -1288,4 +1423,149 @@ mod test { assert!(false); } } + + /// Helper function for block sync tests + fn collect_hashes_from_network_adapter( + network_adapter: Arc, + ) -> HashSet { + let mut requested_block_hashes = HashSet::new(); + let mut network_request = network_adapter.requests.write().unwrap(); + while let Some(request) = network_request.pop_back() { + match request { + NetworkRequests::BlockRequest { hash, .. } => { + requested_block_hashes.insert(hash); + } + _ => panic!("unexpected network request {:?}", request), + } + } + requested_block_hashes + } + + fn create_peer_infos(num_peers: usize) -> Vec { + (0..num_peers) + .map(|_| FullPeerInfo { + peer_info: PeerInfo { + id: PeerId::new(PublicKey::empty(KeyType::ED25519)), + addr: None, + account_id: None, + }, + chain_info: Default::default(), + edge_info: Default::default(), + }) + .collect() + } + + #[test] + fn test_block_sync() { + let network_adapter = Arc::new(MockNetworkAdapter::default()); + let block_fetch_horizon = 10; + let mut block_sync = BlockSync::new(network_adapter.clone(), block_fetch_horizon, false); + let mut chain_genesis = ChainGenesis::test(); + chain_genesis.epoch_length = 100; + let mut env = TestEnv::new(chain_genesis, 2, 1); + let mut blocks = vec![]; + for i in 1..21 { + let block = env.clients[0].produce_block(i).unwrap().unwrap(); + blocks.push(block.clone()); + env.process_block(0, block, Provenance::PRODUCED); + } + let block_headers = blocks.iter().map(|b| b.header().clone()).collect::>(); + let peer_infos = create_peer_infos(2); + env.clients[1].chain.sync_block_headers(block_headers, |_| unreachable!()).unwrap(); + let is_state_sync = block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap(); + assert!(!is_state_sync); + let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone()); + assert_eq!( + requested_block_hashes, + blocks.iter().map(|b| *b.hash()).collect::>() + ); + let last_block_header = blocks.last().unwrap().header().clone(); + assert_eq!(block_sync.cache.len() as u64, last_block_header.height()); + assert_eq!(block_sync.cache.last_header, Some(last_block_header.clone())); + + // receive some blocks + for i in 1..5 { + env.process_block(1, blocks[i - 1].clone(), Provenance::NONE); + } + + // receive a block that is orphaned + let (_, res) = env.clients[1].process_block(blocks[6].clone(), Provenance::NONE); + assert!(matches!(res.unwrap_err().kind(), ErrorKind::Orphan)); + + // run the same block sync again when there are hashes in the cache + let is_state_sync = block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap(); + assert!(!is_state_sync); + let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone()); + let expected_request_hashes = blocks[4..].iter().map(|b| *b.hash()).collect::>(); + assert_eq!(requested_block_hashes, expected_request_hashes); + assert_eq!(block_sync.cache.len() as u64, last_block_header.height() - 4); + assert_eq!(block_sync.cache.last_header, Some(last_block_header.clone())); + + // Receive all blocks. Should not request more. + for i in 5..21 { + env.process_block(1, blocks[i - 1].clone(), Provenance::NONE); + } + block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap(); + let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone()); + assert!(requested_block_hashes.is_empty()); + assert!(block_sync.cache.hashes.is_empty()); + } + + #[test] + fn test_block_sync_with_reorg_past_final_block() { + let network_adapter = Arc::new(MockNetworkAdapter::default()); + let block_fetch_horizon = 30; + let mut block_sync = BlockSync::new(network_adapter.clone(), block_fetch_horizon, false); + let mut chain_genesis = ChainGenesis::test(); + chain_genesis.epoch_length = 100; + let mut env = TestEnv::new(chain_genesis, 2, 1); + let mut blocks = vec![]; + let produce_block_range = + |range: Range, env: &mut TestEnv, blocks: &mut Vec| { + for i in range { + let block = env.clients[0].produce_block(i).unwrap().unwrap(); + blocks.push(block.clone()); + env.process_block(0, block, Provenance::PRODUCED); + } + }; + produce_block_range(1..10, &mut env, &mut blocks); + let fork_block = env.clients[0].produce_block(22).unwrap().unwrap(); + let mut chain_update = ChainStoreUpdate::new(env.clients[0].chain.mut_store()); + chain_update + .save_latest_known(LatestKnown { + height: blocks.last().unwrap().header().height(), + seen: blocks.last().unwrap().header().raw_timestamp(), + }) + .unwrap(); + chain_update.commit().unwrap(); + produce_block_range(10..21, &mut env, &mut blocks); + let peer_infos = create_peer_infos(2); + let block_headers = blocks.iter().map(|b| b.header().clone()).collect::>(); + env.clients[1].chain.sync_block_headers(block_headers, |_| unreachable!()).unwrap(); + let is_state_sync = block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap(); + assert!(!is_state_sync); + let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone()); + assert_eq!( + requested_block_hashes, + blocks.iter().map(|b| *b.hash()).collect::>() + ); + let last_block_header = blocks.last().unwrap().header().clone(); + assert_eq!(block_sync.cache.len() as u64, last_block_header.height()); + assert_eq!(block_sync.cache.last_header, Some(last_block_header.clone())); + + env.clients[1] + .chain + .sync_block_headers(vec![fork_block.header().clone()], |_| unreachable!()) + .unwrap(); + let is_state_sync = block_sync.block_sync(&mut env.clients[1].chain, &peer_infos).unwrap(); + assert!(!is_state_sync); + let mut expected_block_hashes = + blocks[..9].iter().map(|b| *b.hash()).collect::>(); + expected_block_hashes.insert(*fork_block.hash()); + let requested_block_hashes = collect_hashes_from_network_adapter(network_adapter.clone()); + + assert_eq!(requested_block_hashes, expected_block_hashes); + assert_eq!(block_sync.cache.len(), expected_block_hashes.len()); + assert_eq!(block_sync.cache.last_header, Some(fork_block.header().clone())); + } }