diff --git a/README.md b/README.md index f283a2c1145..7a84800696d 100644 --- a/README.md +++ b/README.md @@ -11,17 +11,16 @@ ## Reference implementation of NEAR Protocol +[![Build Status][ci-badge-master]][ci-url] +![Stable Status][stable-release] +![Prerelease Status][prerelease] [![codecov][codecov-badge]][codecov-url] [![Discord chat][discord-badge]][discord-url] [![Telegram Group][telegram-badge]][telegram-url] -master | beta | stable ----|---|---| -[![Build Status][ci-badge-master]][ci-url] | [![Build Status][ci-badge-beta]][ci-url] | [![Build Status][ci-badge-stable]][ci-url] - +[stable-release]: https://img.shields.io/github/v/release/nearprotocol/nearcore?label=stable +[prerelease]: https://img.shields.io/github/v/release/nearprotocol/nearcore?include_prereleases&label=prerelease [ci-badge-master]: https://badge.buildkite.com/a81147cb62c585cc434459eedd1d25e521453120ead9ee6c64.svg?branch=master -[ci-badge-beta]: https://badge.buildkite.com/a81147cb62c585cc434459eedd1d25e521453120ead9ee6c64.svg?branch=beta -[ci-badge-stable]: https://badge.buildkite.com/a81147cb62c585cc434459eedd1d25e521453120ead9ee6c64.svg?branch=stable [ci-url]: https://buildkite.com/nearprotocol/nearcore [codecov-badge]: https://codecov.io/gh/nearprotocol/nearcore/branch/master/graph/badge.svg [codecov-url]: https://codecov.io/gh/nearprotocol/nearcore diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index f4492997a0d..91a21ae0c5a 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -5,7 +5,7 @@ use std::time::{Duration as TimeDuration, Instant}; use borsh::BorshSerialize; use chrono::Duration; use chrono::Utc; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use rand::rngs::StdRng; use rand::seq::SliceRandom; use rand::SeedableRng; @@ -400,9 +400,9 @@ impl Chain { pub fn save_block(&mut self, block: &Block) -> Result<(), Error> { let mut chain_store_update = ChainStoreUpdate::new(&mut self.store); - if !block.check_validity() { + if let Err(e) = block.check_validity() { byzantine_assert!(false); - return Err(ErrorKind::Other("Invalid block".into()).into()); + return Err(e.into()); } chain_store_update.save_block(block.clone()); @@ -421,6 +421,14 @@ impl Chain { }); } + fn save_block_height_processed(&mut self, block_height: BlockHeight) -> Result<(), Error> { + let mut chain_store_update = ChainStoreUpdate::new(&mut self.store); + if !chain_store_update.is_height_processed(block_height)? { + chain_store_update.save_block_height_processed(block_height); + } + Ok(()) + } + // GC CONTRACT // === // @@ -605,6 +613,13 @@ impl Chain { Ok(()) } + /// Do Basic validation of a block upon receiving it. Check that header is valid + /// and block is well-formed (various roots match). + pub fn validate_block(&mut self, block: &Block) -> Result<(), Error> { + self.process_block_header(&block.header(), |_| {})?; + block.check_validity().map_err(|e| e.into()) + } + /// Process a block header received during "header first" propagation. pub fn process_block_header( &mut self, @@ -1020,10 +1035,6 @@ impl Chain { { near_metrics::inc_counter(&metrics::BLOCK_PROCESSED_TOTAL); - if block.chunks().len() != self.runtime_adapter.num_shards() as usize { - return Err(ErrorKind::IncorrectNumberOfChunkHeaders.into()); - } - let prev_head = self.store.head()?; let mut chain_update = ChainUpdate::new( &mut self.store, @@ -1035,9 +1046,11 @@ impl Chain { self.doomslug_threshold_mode, ); let maybe_new_head = chain_update.process_block(me, &block, &provenance, on_challenge); + let block_height = block.header().height(); match maybe_new_head { Ok((head, needs_to_start_fetching_state)) => { + chain_update.chain_store_update.save_block_height_processed(block_height); chain_update.commit()?; if needs_to_start_fetching_state { @@ -1079,62 +1092,64 @@ impl Chain { Ok(head) } - Err(e) => match e.kind() { - ErrorKind::Orphan => { - let tail_height = self.store.tail()?; - // we only add blocks that couldn't have been gc'ed to the orphan pool. - if block.header().height() >= tail_height { + Err(e) => { + match e.kind() { + ErrorKind::Orphan => { + let tail_height = self.store.tail()?; + // we only add blocks that couldn't have been gc'ed to the orphan pool. + if block_height >= tail_height { + let block_hash = *block.hash(); + let orphan = Orphan { block, provenance, added: Instant::now() }; + + self.orphans.add(orphan); + + debug!( + target: "chain", + "Process block: orphan: {:?}, # orphans {}{}", + block_hash, + self.orphans.len(), + if self.orphans.len_evicted() > 0 { + format!(", # evicted {}", self.orphans.len_evicted()) + } else { + String::new() + }, + ); + } + } + ErrorKind::ChunksMissing(missing_chunks) => { let block_hash = *block.hash(); + block_misses_chunks(missing_chunks.clone()); let orphan = Orphan { block, provenance, added: Instant::now() }; - self.orphans.add(orphan); + self.blocks_with_missing_chunks.add(orphan); debug!( target: "chain", - "Process block: orphan: {:?}, # orphans {}{}", - block_hash, - self.orphans.len(), - if self.orphans.len_evicted() > 0 { - format!(", # evicted {}", self.orphans.len_evicted()) - } else { - String::new() - }, + "Process block: missing chunks. Block hash: {:?}. Missing chunks: {:?}", + block_hash, missing_chunks, ); } - Err(e) - } - ErrorKind::ChunksMissing(missing_chunks) => { - let block_hash = *block.hash(); - block_misses_chunks(missing_chunks.clone()); - let orphan = Orphan { block, provenance, added: Instant::now() }; - - self.blocks_with_missing_chunks.add(orphan); - - debug!( - target: "chain", - "Process block: missing chunks. Block hash: {:?}. Missing chunks: {:?}", - block_hash, missing_chunks, - ); - Err(e) - } - ErrorKind::EpochOutOfBounds => { - // Possibly block arrived before we finished processing all of the blocks for epoch before last. - // Or someone is attacking with invalid chain. - debug!(target: "chain", "Received block {}/{} ignored, as epoch is unknown", block.header().height(), block.hash()); - Err(e) + ErrorKind::EpochOutOfBounds => { + // Possibly block arrived before we finished processing all of the blocks for epoch before last. + // Or someone is attacking with invalid chain. + debug!(target: "chain", "Received block {}/{} ignored, as epoch is unknown", block_height, block.hash()); + } + ErrorKind::Unfit(ref msg) => { + debug!( + target: "chain", + "Block {} at {} is unfit at this time: {}", + block.hash(), + block_height, + msg + ); + } + _ => {} } - ErrorKind::Unfit(ref msg) => { - debug!( - target: "chain", - "Block {} at {} is unfit at this time: {}", - block.hash(), - block.header().height(), - msg - ); - Err(ErrorKind::Unfit(msg.clone()).into()) + if let Err(e) = self.save_block_height_processed(block_height) { + warn!(target: "chain", "Failed to save processed height {}: {}", block_height, e); } - _ => Err(e), - }, + Err(e) + } } } @@ -2817,6 +2832,10 @@ impl<'a> ChainUpdate<'a> { { debug!(target: "chain", "Process block {} at {}, approvals: {}, me: {:?}", block.hash(), block.header().height(), block.header().num_approvals(), me); + if block.chunks().len() != self.runtime_adapter.num_shards() as usize { + return Err(ErrorKind::IncorrectNumberOfChunkHeaders.into()); + } + // Check if we have already processed this block previously. self.check_known(block.header().hash())?; @@ -2882,9 +2901,9 @@ impl<'a> ChainUpdate<'a> { return Err(ErrorKind::InvalidRandomnessBeaconOutput.into()); } - if !block.check_validity() { + if let Err(e) = block.check_validity() { byzantine_assert!(false); - return Err(ErrorKind::Other("Invalid block".into()).into()); + return Err(e.into()); } let protocol_version = @@ -3134,8 +3153,6 @@ impl<'a> ChainUpdate<'a> { return Err(ErrorKind::InvalidApprovals.into()); }; - self.runtime_adapter.verify_block_signature(header)?; - let stakes = self .runtime_adapter .get_epoch_block_approvers_ordered(header.prev_hash())? diff --git a/chain/chain/src/error.rs b/chain/chain/src/error.rs index 8983ad35906..1a3312de63d 100644 --- a/chain/chain/src/error.rs +++ b/chain/chain/src/error.rs @@ -5,6 +5,7 @@ use chrono::{DateTime, Utc}; use failure::{Backtrace, Context, Fail}; use log::error; +use near_primitives::block::BlockValidityError; use near_primitives::challenge::{ChunkProofs, ChunkState}; use near_primitives::errors::{EpochError, StorageError}; use near_primitives::hash::CryptoHash; @@ -46,9 +47,6 @@ pub enum ErrorKind { /// Invalid block proposed signature. #[fail(display = "Invalid Block Proposer Signature")] InvalidBlockProposer, - /// Invalid block confirmation signature. - #[fail(display = "Invalid Block Confirmation Signature")] - InvalidBlockConfirmation, /// Invalid state root hash. #[fail(display = "Invalid State Root Hash")] InvalidStateRoot, @@ -76,6 +74,9 @@ pub enum ErrorKind { /// Invalid transactions in the block. #[fail(display = "Invalid Transactions")] InvalidTransactions, + /// Invalid Challenge Root (doesn't match actual challenge) + #[fail(display = "Invalid Challenge Root")] + InvalidChallengeRoot, /// Invalid challenge (wrong signature or format). #[fail(display = "Invalid Challenge")] InvalidChallenge, @@ -241,7 +242,6 @@ impl Error { | ErrorKind::InvalidBlockFutureTime(_) | ErrorKind::InvalidBlockHeight | ErrorKind::InvalidBlockProposer - | ErrorKind::InvalidBlockConfirmation | ErrorKind::InvalidChunk | ErrorKind::InvalidChunkProofs(_) | ErrorKind::InvalidChunkState(_) @@ -273,7 +273,8 @@ impl Error { | ErrorKind::InvalidStateRequest(_) | ErrorKind::InvalidRandomnessBeaconOutput | ErrorKind::InvalidBlockMerkleRoot - | ErrorKind::NotAValidator => true, + | ErrorKind::NotAValidator + | ErrorKind::InvalidChallengeRoot => true, } } @@ -315,3 +316,17 @@ impl From for Error { .into() } } + +impl From for Error { + fn from(error: BlockValidityError) -> Self { + match error { + BlockValidityError::InvalidStateRoot => ErrorKind::InvalidStateRoot, + BlockValidityError::InvalidReceiptRoot => ErrorKind::InvalidChunkReceiptsRoot, + BlockValidityError::InvalidTransactionRoot => ErrorKind::InvalidTxRoot, + BlockValidityError::InvalidChunkHeaderRoot => ErrorKind::InvalidChunkHeadersRoot, + BlockValidityError::InvalidNumChunksIncluded => ErrorKind::InvalidChunkMask, + BlockValidityError::InvalidChallengeRoot => ErrorKind::InvalidChallengeRoot, + } + .into() + } +} diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 3bb5143d800..7426a238dbb 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -38,11 +38,12 @@ use near_store::{ ColBlocksToCatchup, ColChallengedBlocks, ColChunkExtra, ColChunkHashesByHeight, ColChunkPerHeightShard, ColChunks, ColEpochLightClientBlocks, ColGCCount, ColIncomingReceipts, ColInvalidChunks, ColLastBlockWithNewChunk, ColNextBlockHashes, ColNextBlockWithNewChunk, - ColOutcomesByBlockHash, ColOutgoingReceipts, ColPartialChunks, ColReceiptIdToShardId, ColState, - ColStateChanges, ColStateDlInfos, ColStateHeaders, ColStateParts, ColTransactionRefCount, - ColTransactionResult, ColTransactions, ColTrieChanges, DBCol, KeyForStateChanges, ShardTries, - Store, StoreUpdate, TrieChanges, WrappedTrieChanges, CHUNK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, - LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, SHOULD_COL_GC, SYNC_HEAD_KEY, TAIL_KEY, + ColOutcomesByBlockHash, ColOutgoingReceipts, ColPartialChunks, ColProcessedBlockHeights, + ColReceiptIdToShardId, ColState, ColStateChanges, ColStateDlInfos, ColStateHeaders, + ColStateParts, ColTransactionRefCount, ColTransactionResult, ColTransactions, ColTrieChanges, + DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges, + CHUNK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, + SHOULD_COL_GC, SYNC_HEAD_KEY, TAIL_KEY, }; use crate::byzantine_assert; @@ -280,6 +281,8 @@ pub trait ChainStoreAccess { let block_hash = *self.get_block_hash_from_ordinal(block_ordinal)?; self.get_block_merkle_tree(&block_hash) } + + fn is_height_processed(&mut self, height: BlockHeight) -> Result; } /// All chain-related database operations. @@ -338,6 +341,8 @@ pub struct ChainStore { block_merkle_tree: SizedCache, PartialMerkleTree>, /// Cache of block ordinal to block hash. block_ordinal_to_hash: SizedCache, CryptoHash>, + /// Processed block heights. + processed_block_heights: SizedCache, ()>, } pub fn option_to_not_found(res: io::Result>, field_name: &str) -> Result { @@ -378,6 +383,7 @@ impl ChainStore { transactions: SizedCache::with_size(CHUNK_CACHE_SIZE), block_merkle_tree: SizedCache::with_size(CACHE_SIZE), block_ordinal_to_hash: SizedCache::with_size(CACHE_SIZE), + processed_block_heights: SizedCache::with_size(CACHE_SIZE), } } @@ -1071,6 +1077,17 @@ impl ChainStoreAccess for ChainStore { &format!("BLOCK ORDINAL: {}", block_ordinal), ) } + + fn is_height_processed(&mut self, height: BlockHeight) -> Result { + read_with_cache( + &*self.store, + ColProcessedBlockHeights, + &mut self.processed_block_heights, + &index_to_bytes(height), + ) + .map(|r| r.is_some()) + .map_err(|e| e.into()) + } } /// Cache update for ChainStore @@ -1102,6 +1119,7 @@ struct ChainStoreCacheUpdate { block_merkle_tree: HashMap, block_ordinal_to_hash: HashMap, gc_count: HashMap, + processed_block_heights: HashSet, } impl ChainStoreCacheUpdate { @@ -1592,6 +1610,14 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { self.chain_store.get_block_hash_from_ordinal(block_ordinal) } } + + fn is_height_processed(&mut self, height: BlockHeight) -> Result { + if self.chain_store_cache_update.processed_block_heights.contains(&height) { + Ok(true) + } else { + self.chain_store.is_height_processed(height) + } + } } impl<'a> ChainStoreUpdate<'a> { @@ -1876,6 +1902,10 @@ impl<'a> ChainStoreUpdate<'a> { .insert((height, shard_id), chunk_hash); } + pub fn save_block_height_processed(&mut self, height: BlockHeight) { + self.chain_store_cache_update.processed_block_heights.insert(height); + } + pub fn inc_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<(), Error> { let refcount = match self.get_block_refcount(block_hash) { Ok(refcount) => refcount.clone(), @@ -2133,16 +2163,20 @@ impl<'a> ChainStoreUpdate<'a> { if hashes.is_empty() { epoch_to_hashes.remove(epoch_id); } + let key = index_to_bytes(height); if epoch_to_hashes.is_empty() { - store_update.delete(ColBlockPerHeight, &index_to_bytes(height)); - self.chain_store.block_hash_per_height.cache_remove(&index_to_bytes(height)); + store_update.delete(ColBlockPerHeight, &key); + self.chain_store.block_hash_per_height.cache_remove(&key); } else { - store_update.set_ser(ColBlockPerHeight, &index_to_bytes(height), &epoch_to_hashes)?; + store_update.set_ser(ColBlockPerHeight, &key, &epoch_to_hashes)?; self.chain_store .block_hash_per_height .cache_set(index_to_bytes(height), epoch_to_hashes); } self.inc_gc(ColBlockPerHeight); + if self.is_height_processed(height)? { + self.gc_col(ColProcessedBlockHeights, &key); + } self.merge(store_update); Ok(()) } @@ -2285,6 +2319,10 @@ impl<'a> ChainStoreUpdate<'a> { DBCol::ColTransactionRefCount => { store_update.delete(col, key); } + DBCol::ColProcessedBlockHeights => { + store_update.delete(col, key); + self.chain_store.processed_block_heights.cache_remove(key); + } DBCol::ColDbVersion | DBCol::ColBlockMisc | DBCol::ColBlockHeader @@ -2593,6 +2631,9 @@ impl<'a> ChainStoreUpdate<'a> { for (chunk_hash, chunk) in self.chain_store_cache_update.invalid_chunks.iter() { store_update.set_ser(ColInvalidChunks, chunk_hash.as_ref(), chunk)?; } + for block_height in self.chain_store_cache_update.processed_block_heights.iter() { + store_update.set_ser(ColProcessedBlockHeights, &index_to_bytes(*block_height), &())?; + } for (col, mut gc_count) in self.chain_store_cache_update.gc_count.clone().drain() { if let Ok(Some(value)) = self.store().get_ser::( ColGCCount, @@ -2640,6 +2681,7 @@ impl<'a> ChainStoreUpdate<'a> { block_refcounts, block_merkle_tree, block_ordinal_to_hash, + processed_block_heights, .. } = self.chain_store_cache_update; for (hash, block) in blocks { @@ -2733,6 +2775,9 @@ impl<'a> ChainStoreUpdate<'a> { .block_ordinal_to_hash .cache_set(index_to_bytes(block_ordinal), block_hash); } + for block_height in processed_block_heights { + self.chain_store.processed_block_heights.cache_set(index_to_bytes(block_height), ()); + } Ok(()) } diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index a3f75f0fcca..fd396bd54dd 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -270,16 +270,6 @@ impl RuntimeAdapter for KeyValueRuntime { self.tries.get_trie_for_shard(shard_id) } - fn verify_block_signature(&self, header: &BlockHeader) -> Result<(), Error> { - let validators = &self.validators - [self.get_epoch_and_valset(*header.prev_hash()).map_err(|err| err.to_string())?.1]; - let validator = &validators[(header.height() as usize) % validators.len()]; - if !header.verify_block_producer(&validator.public_key) { - return Err(ErrorKind::InvalidBlockProposer.into()); - } - Ok(()) - } - fn verify_block_vrf( &self, _epoch_id: &EpochId, @@ -302,8 +292,11 @@ impl RuntimeAdapter for KeyValueRuntime { Ok(true) } - fn verify_header_signature(&self, _header: &BlockHeader) -> Result { - Ok(true) + fn verify_header_signature(&self, header: &BlockHeader) -> Result { + let validators = &self.validators + [self.get_epoch_and_valset(*header.prev_hash()).map_err(|err| err.to_string())?.1]; + let validator = &validators[(header.height() as usize) % validators.len()]; + Ok(header.verify_block_producer(&validator.public_key)) } fn verify_chunk_header_signature(&self, _header: &ShardChunkHeader) -> Result { diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 51c135825b6..516eedb1c63 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -223,8 +223,6 @@ pub trait RuntimeAdapter: Send + Sync { /// Returns trie. fn get_trie_for_shard(&self, shard_id: ShardId) -> Trie; - /// Verify block producer validity - fn verify_block_signature(&self, header: &BlockHeader) -> Result<(), Error>; fn verify_block_vrf( &self, epoch_id: &EpochId, diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index 314d068917d..32fe98d4fca 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -1226,6 +1226,12 @@ impl ShardsManager { chunk_entry: &EncodedChunksCacheEntry, store_update: &mut ChainStoreUpdate<'_>, ) { + let cares_about_shard = self.cares_about_shard_this_or_next_epoch( + self.me.as_ref(), + &chunk_entry.header.inner.prev_block_hash, + chunk_entry.header.inner.shard_id, + true, + ); let prev_block_hash = chunk_entry.header.inner.prev_block_hash; let partial_chunk = PartialEncodedChunk { header: chunk_entry.header.clone(), @@ -1233,7 +1239,11 @@ impl ShardsManager { .parts .iter() .filter_map(|(part_ord, part_entry)| { - if let Ok(need_part) = self.need_part(&prev_block_hash, *part_ord) { + if cares_about_shard + || self.need_part(&prev_block_hash, *part_ord).unwrap_or(false) + { + Some(part_entry.clone()) + } else if let Ok(need_part) = self.need_part(&prev_block_hash, *part_ord) { if need_part { Some(part_entry.clone()) } else { @@ -1248,7 +1258,9 @@ impl ShardsManager { .receipts .iter() .filter_map(|(shard_id, receipt)| { - if self.need_receipt(&prev_block_hash, *shard_id) { + if cares_about_shard || self.need_receipt(&prev_block_hash, *shard_id) { + Some(receipt.clone()) + } else if self.need_receipt(&prev_block_hash, *shard_id) { Some(receipt.clone()) } else { None diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 1a279cc4ef9..8b7cc3c09f3 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -585,6 +585,18 @@ impl Client { block: Block, provenance: Provenance, ) -> (Vec, Result, near_chain::Error>) { + let is_requested = match provenance { + Provenance::PRODUCED | Provenance::SYNC => true, + Provenance::NONE => false, + }; + // drop the block if a) it is not requested and b) we already processed this height. + if !is_requested { + match self.chain.mut_store().is_height_processed(block.header().height()) { + Ok(true) => return (vec![], Ok(None)), + Ok(false) => {} + Err(e) => return (vec![], Err(e)), + } + } // TODO: replace to channels or cross beams here? we don't have multi-threading here so it's mostly to get around borrow checker. let accepted_blocks = Arc::new(RwLock::new(vec![])); let blocks_missing_chunks = Arc::new(RwLock::new(vec![])); diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 6d0a24db8f4..0427b98168a 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -55,6 +55,9 @@ use crate::StatusResponse; const STATUS_WAIT_TIME_MULTIPLIER: u64 = 10; /// Drop blocks whose height are beyond head + horizon. const BLOCK_HORIZON: u64 = 500; +/// How many intervals of max_block_production_delay to wait being several blocks behind before +/// kicking off syncing +const SEVERAL_BLOCKS_BEHIND_WAIT_MULTIPLIER: u32 = 5; pub struct ClientActor { /// Adversarial controls @@ -308,7 +311,8 @@ impl Handler for ClientActor { } } } - self.receive_block(block, peer_id, was_requested) + self.receive_block(block, peer_id, was_requested); + NetworkClientResponses::NoResponse } else { match self .client @@ -324,7 +328,7 @@ impl Handler for ClientActor { } _ => {} } - return NetworkClientResponses::NoResponse; + NetworkClientResponses::NoResponse } } NetworkClientMessages::BlockHeaders(headers, peer_id) => { @@ -801,7 +805,8 @@ impl ClientActor { match self.client.produce_block(next_height) { Ok(Some(block)) => { let block_hash = *block.hash(); - let res = self.process_block(block, Provenance::PRODUCED); + let peer_id = self.node_id.clone(); + let res = self.process_block(block, Provenance::PRODUCED, &peer_id); match &res { Ok(_) => Ok(()), Err(e) => match e.kind() { @@ -853,22 +858,33 @@ impl ClientActor { &mut self, block: Block, provenance: Provenance, + peer_id: &PeerId, ) -> Result<(), near_chain::Error> { // If we produced the block, send it out before we apply the block. // If we didn't produce the block and didn't request it, do basic validation // before sending it out. if provenance == Provenance::PRODUCED { self.network_adapter.do_send(NetworkRequests::Block { block: block.clone() }); - } else if provenance == Provenance::NONE { - // Don't care about challenge here since it will be handled when we actually process - // the block. - if self.client.chain.process_block_header(&block.header(), |_| {}).is_ok() { - let head = self.client.chain.head()?; - // do not broadcast blocks that are too far back. - if head.height < block.header().height() - || &head.epoch_id == block.header().epoch_id() - { - self.client.rebroadcast_block(block.clone()); + } else { + match self.client.chain.validate_block(&block) { + Ok(_) => { + let head = self.client.chain.head()?; + // do not broadcast blocks that are too far back. + if (head.height < block.header().height() + || &head.epoch_id == block.header().epoch_id()) + && provenance == Provenance::NONE + { + self.client.rebroadcast_block(block.clone()); + } + } + Err(e) => { + if e.is_bad_data() { + self.network_adapter.do_send(NetworkRequests::BanPeer { + peer_id: peer_id.clone(), + ban_reason: ReasonForBan::BadBlockHeader, + }); + return Err(e); + } } } } @@ -877,28 +893,23 @@ impl ClientActor { result.map(|_| ()) } - /// Processes received block, returns boolean if block was reasonable or malicious. - fn receive_block( - &mut self, - block: Block, - peer_id: PeerId, - was_requested: bool, - ) -> NetworkClientResponses { + /// Processes received block. Ban peer if the block header is invalid or the block is ill-formed. + fn receive_block(&mut self, block: Block, peer_id: PeerId, was_requested: bool) { let hash = *block.hash(); debug!(target: "client", "{:?} Received block {} <- {} at {} from {}, requested: {}", self.client.validator_signer.as_ref().map(|vs| vs.validator_id()), hash, block.header().prev_hash(), block.header().height(), peer_id, was_requested); // drop the block if it is too far ahead - let head = unwrap_or_return!(self.client.chain.head(), NetworkClientResponses::NoResponse); + let head = unwrap_or_return!(self.client.chain.head()); if block.header().height() >= head.height + BLOCK_HORIZON { debug!(target: "client", "dropping block {} that is too far ahead. Block height {} current head height {}", block.hash(), block.header().height(), head.height); - return NetworkClientResponses::NoResponse; + return; } let prev_hash = *block.header().prev_hash(); let provenance = if was_requested { near_chain::Provenance::SYNC } else { near_chain::Provenance::NONE }; - match self.process_block(block, provenance) { - Ok(_) => NetworkClientResponses::NoResponse, + match self.process_block(block, provenance, &peer_id) { + Ok(_) => {} Err(ref err) if err.is_bad_data() => { - NetworkClientResponses::Ban { ban_reason: ReasonForBan::BadBlock } + warn!(target: "client", "receive bad block: {}", err); } Err(ref err) if err.is_error() => { if self.client.sync_status.is_syncing() { @@ -908,14 +919,12 @@ impl ClientActor { } else { error!(target: "client", "Error on receival of block: {}", err); } - NetworkClientResponses::NoResponse } Err(e) => match e.kind() { near_chain::ErrorKind::Orphan => { if !self.client.chain.is_orphan(&prev_hash) { self.request_block_by_hash(prev_hash, peer_id) } - NetworkClientResponses::NoResponse } near_chain::ErrorKind::ChunksMissing(missing_chunks) => { debug!( @@ -927,11 +936,9 @@ impl ClientActor { missing_chunks.iter().map(|header| header.chunk_hash()).collect::>() ); self.client.shards_mgr.request_chunks(missing_chunks); - NetworkClientResponses::NoResponse } _ => { debug!(target: "client", "Process block: block {} refused by chain: {}", hash, e.kind()); - NetworkClientResponses::NoResponse } }, } @@ -977,7 +984,7 @@ impl ClientActor { let mut is_syncing = self.client.sync_status.is_syncing(); let full_peer_info = if let Some(full_peer_info) = - highest_height_peer(&self.network_info.highest_height_peers) + highest_height_peer(&self.network_info.highest_height_peers, head.height) { full_peer_info } else { @@ -1007,6 +1014,21 @@ impl ClientActor { full_peer_info.chain_info.height, ); is_syncing = true; + } else { + if let SyncStatus::NoSyncSeveralBlocksBehind { since_when, our_height } = + self.client.sync_status + { + let now = Utc::now(); + if now > since_when + && (now - since_when).to_std().unwrap() + >= self.client.config.max_block_production_delay + * SEVERAL_BLOCKS_BEHIND_WAIT_MULTIPLIER + && our_height == head.height + { + info!(target: "sync", "Have been at the same height for too long, while peers have newer bocks. Forcing synchronization"); + is_syncing = true; + } + } } } Ok((is_syncing, full_peer_info.chain_info.height)) @@ -1130,6 +1152,27 @@ impl ClientActor { self.check_send_announce_account(head.prev_block_hash); } wait_period = self.client.config.sync_check_period; + + // Handle the case in which we failed to receive a block, and our inactivity prevents + // the network from making progress + if let Ok(head) = self.client.chain.head() { + match self.client.sync_status { + SyncStatus::NoSync => { + if head.height < highest_height { + self.client.sync_status = SyncStatus::NoSyncSeveralBlocksBehind { + since_when: Utc::now(), + our_height: head.height, + } + } + } + SyncStatus::NoSyncSeveralBlocksBehind { our_height, .. } => { + if head.height > our_height { + self.client.sync_status = SyncStatus::NoSync; + } + } + _ => {} + } + } } else { // Run each step of syncing separately. unwrap_or_run_later!(self.client.header_sync.run( @@ -1199,7 +1242,7 @@ impl ClientActor { self.client.sync_status = SyncStatus::StateSync(sync_hash, new_shard_sync); if fetch_block { if let Some(peer_info) = - highest_height_peer(&self.network_info.highest_height_peers) + highest_height_peer(&self.network_info.highest_height_peers, 0) { if let Ok(header) = self.client.chain.get_block_header(&sync_hash) { for hash in diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs index 3ceee03b356..489cd8be2e4 100644 --- a/chain/client/src/info.rs +++ b/chain/client/src/info.rs @@ -168,7 +168,9 @@ fn display_sync_status( ) -> String { match sync_status { SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height), - SyncStatus::NoSync => format!("#{:>8} {:>44}", head.height, head.last_block_hash), + SyncStatus::NoSync | SyncStatus::NoSyncSeveralBlocksBehind { .. } => { + format!("#{:>8} {:>44}", head.height, head.last_block_hash) + } SyncStatus::HeaderSync { current_height, highest_height } => { let percent = if *highest_height <= genesis_height { 0 diff --git a/chain/client/src/sync.rs b/chain/client/src/sync.rs index f47c19e1de2..49d58979bd5 100644 --- a/chain/client/src/sync.rs +++ b/chain/client/src/sync.rs @@ -9,7 +9,7 @@ use ansi_term::Color::{Purple, Yellow}; use chrono::{DateTime, Duration, Utc}; use futures::{future, FutureExt}; use log::{debug, error, info, warn}; -use rand::seq::SliceRandom; +use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng}; use near_chain::types::BlockSyncResponse; @@ -52,12 +52,22 @@ pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000; pub const NS_PER_SECOND: u128 = 1_000_000_000; /// Get random peer from the hightest height peers. -pub fn highest_height_peer(highest_height_peers: &Vec) -> Option { +pub fn highest_height_peer( + highest_height_peers: &Vec, + min_height: BlockHeight, +) -> Option { if highest_height_peers.len() == 0 { return None; } - let index = thread_rng().gen_range(0, highest_height_peers.len()); - Some(highest_height_peers[index].clone()) + + match highest_height_peers + .iter() + .filter(|peer| peer.chain_info.height > min_height) + .choose(&mut thread_rng()) + { + None => highest_height_peers.choose(&mut thread_rng()).cloned(), + Some(peer) => Some(peer.clone()), + } } /// Helper to keep track of sync headers. @@ -112,7 +122,9 @@ impl HeaderSync { SyncStatus::HeaderSync { .. } | SyncStatus::BodySync { .. } | SyncStatus::StateSyncDone => true, - SyncStatus::NoSync | SyncStatus::AwaitingPeers => { + SyncStatus::NoSync + | SyncStatus::NoSyncSeveralBlocksBehind { .. } + | SyncStatus::AwaitingPeers => { let sync_head = chain.sync_head()?; debug!(target: "sync", "Sync: initial transition to Header sync. Sync head: {} at {}, resetting to {} at {}", sync_head.last_block_hash, sync_head.height, @@ -131,7 +143,7 @@ impl HeaderSync { SyncStatus::HeaderSync { current_height: header_head.height, highest_height }; let header_head = chain.header_head()?; self.syncing_peer = None; - if let Some(peer) = highest_height_peer(&highest_height_peers) { + if let Some(peer) = highest_height_peer(&highest_height_peers, 0) { if peer.chain_info.height > header_head.height { self.syncing_peer = self.request_headers(chain, peer); } @@ -169,9 +181,11 @@ impl HeaderSync { // Did we receive as many headers as we expected from the peer? Request more or ban peer. let stalling = header_head.height <= old_expected_height && now > timeout; - // Always enable header sync on initial state transition from NoSync / AwaitingPeers. + // Always enable header sync on initial state transition from NoSync / NoSyncFewBlocksBehind / AwaitingPeers. let force_sync = match sync_status { - SyncStatus::NoSync | SyncStatus::AwaitingPeers => true, + SyncStatus::NoSync + | SyncStatus::NoSyncSeveralBlocksBehind { .. } + | SyncStatus::AwaitingPeers => true, _ => false, }; diff --git a/chain/client/src/types.rs b/chain/client/src/types.rs index 0e631963b6c..315e91b806e 100644 --- a/chain/client/src/types.rs +++ b/chain/client/src/types.rs @@ -121,6 +121,8 @@ pub enum SyncStatus { AwaitingPeers, /// Not syncing / Done syncing. NoSync, + /// Not syncing, but have a peer that is one block ahead. + NoSyncSeveralBlocksBehind { since_when: DateTime, our_height: BlockHeight }, /// Downloading block headers for fast sync. HeaderSync { current_height: BlockHeight, highest_height: BlockHeight }, /// State sync, with different states of state sync for different shards. @@ -140,7 +142,7 @@ impl SyncStatus { /// True if currently engaged in syncing the chain. pub fn is_syncing(&self) -> bool { match self { - SyncStatus::NoSync => false, + SyncStatus::NoSync | SyncStatus::NoSyncSeveralBlocksBehind { .. } => false, _ => true, } } diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index 61f32316ca3..69b7b7f414d 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -903,13 +903,15 @@ impl Handler for ViewClientActor { NetworkViewClientMessages::AnnounceAccount(announce_accounts) => { let mut filtered_announce_accounts = Vec::new(); - for (announce_account, last_epoch) in announce_accounts.into_iter() { + for (announce_account, last_epoch) in announce_accounts { + // Keep the announcement if it is newer than the last announcement from + // the same account. if let Some(last_epoch) = last_epoch { match self .runtime_adapter .compare_epoch_id(&announce_account.epoch_id, &last_epoch) { - Ok(Ordering::Less) => {} + Ok(Ordering::Greater) => {} _ => continue, } } diff --git a/chain/client/tests/challenges.rs b/chain/client/tests/challenges.rs index cd65fc2a48f..794b93b9c78 100644 --- a/chain/client/tests/challenges.rs +++ b/chain/client/tests/challenges.rs @@ -101,7 +101,7 @@ fn test_verify_block_double_sign_challenge() { assert!(validate_challenge(&*runtime_adapter, &epoch_id, &genesis.hash(), &invalid_challenge,) .is_err()); - let (_, result) = env.clients[0].process_block(b2, Provenance::NONE); + let (_, result) = env.clients[0].process_block(b2, Provenance::SYNC); assert!(result.is_ok()); let mut last_message = env.network_adapters[0].pop().unwrap(); if let NetworkRequests::Block { .. } = last_message { diff --git a/chain/client/tests/process_blocks.rs b/chain/client/tests/process_blocks.rs index eaab5f90b21..01ab31532d3 100644 --- a/chain/client/tests/process_blocks.rs +++ b/chain/client/tests/process_blocks.rs @@ -19,13 +19,13 @@ use near_chunks::{ChunkStatus, ShardsManager}; use near_client::test_utils::{create_chunk_on_height, setup_mock_all_validators}; use near_client::test_utils::{setup_client, setup_mock, TestEnv}; use near_client::{Client, GetBlock, GetBlockWithMerkleTree}; -use near_crypto::{InMemorySigner, KeyType, Signature, Signer}; +use near_crypto::{InMemorySigner, KeyType, PublicKey, Signature, Signer}; use near_logger_utils::init_test_logger; #[cfg(feature = "metric_recorder")] use near_network::recorder::MetricRecorder; use near_network::routing::EdgeInfo; use near_network::test_utils::{wait_or_panic, MockNetworkAdapter}; -use near_network::types::{NetworkInfo, PeerChainInfo}; +use near_network::types::{NetworkInfo, PeerChainInfo, ReasonForBan}; use near_network::{ FullPeerInfo, NetworkClientMessages, NetworkClientResponses, NetworkRequests, NetworkResponses, PeerInfo, @@ -33,12 +33,12 @@ use near_network::{ use near_primitives::block::{Approval, ApprovalInner}; use near_primitives::errors::InvalidTxError; use near_primitives::hash::{hash, CryptoHash}; -use near_primitives::merkle::{merklize, verify_hash}; +use near_primitives::merkle::verify_hash; use near_primitives::sharding::{EncodedShardChunk, ReedSolomonWrapper}; use near_primitives::transaction::{ Action, DeployContractAction, FunctionCallAction, SignedTransaction, Transaction, }; -use near_primitives::types::{BlockHeight, EpochId, MerkleHash, NumBlocks}; +use near_primitives::types::{BlockHeight, EpochId, NumBlocks, ValidatorStake}; use near_primitives::utils::to_timestamp; use near_primitives::validator_signer::{InMemoryValidatorSigner, ValidatorSigner}; use near_primitives::version::PROTOCOL_VERSION; @@ -362,6 +362,129 @@ fn produce_block_with_approvals_arrived_early() { } /// Sends one invalid block followed by one valid block, and checks that client announces only valid block. +/// and that the node bans the peer for invalid block header. +fn invalid_blocks_common(is_requested: bool) { + init_test_logger(); + System::run(move || { + let mut ban_counter = 0; + let (client, view_client) = setup_mock( + vec!["test"], + "other", + false, + false, + Box::new(move |msg, _ctx, _client_actor| { + match msg { + NetworkRequests::Block { block } => { + if is_requested { + panic!("rebroadcasting requested block"); + } else { + assert_eq!(block.header().height(), 1); + assert_eq!(block.header().chunk_mask().len(), 1); + assert_eq!(ban_counter, 1); + System::current().stop(); + } + } + NetworkRequests::BanPeer { ban_reason, .. } => { + assert_eq!(ban_reason, &ReasonForBan::BadBlockHeader); + ban_counter += 1; + if ban_counter == 2 && is_requested { + System::current().stop(); + } + } + _ => {} + }; + NetworkResponses::NoResponse + }), + ); + actix::spawn(view_client.send(GetBlockWithMerkleTree::latest()).then(move |res| { + let (last_block, mut block_merkle_tree) = res.unwrap().unwrap(); + let signer = InMemoryValidatorSigner::from_seed("test", KeyType::ED25519, "test"); + // Send block with invalid chunk mask + let mut block = Block::produce( + PROTOCOL_VERSION, + &last_block.header.clone().into(), + last_block.header.height + 1, + last_block.chunks.iter().cloned().map(Into::into).collect(), + EpochId::default(), + if last_block.header.prev_hash == CryptoHash::default() { + EpochId(last_block.header.hash) + } else { + EpochId(last_block.header.next_epoch_id.clone()) + }, + vec![], + Rational::from_integer(0), + 0, + 100, + Some(0), + vec![], + vec![], + &signer, + last_block.header.next_bp_hash, + CryptoHash::default(), + ); + block.mut_header().get_mut().inner_rest.chunk_mask = vec![]; + client.do_send(NetworkClientMessages::Block( + block.clone(), + PeerInfo::random().id, + is_requested, + )); + + // Send proper block. + block_merkle_tree.insert(last_block.header.hash); + let block2 = Block::produce( + PROTOCOL_VERSION, + &last_block.header.clone().into(), + last_block.header.height + 1, + last_block.chunks.into_iter().map(Into::into).collect(), + EpochId::default(), + if last_block.header.prev_hash == CryptoHash::default() { + EpochId(last_block.header.hash) + } else { + EpochId(last_block.header.next_epoch_id.clone()) + }, + vec![], + Rational::from_integer(0), + 0, + 100, + Some(0), + vec![], + vec![], + &signer, + last_block.header.next_bp_hash, + block_merkle_tree.root(), + ); + client.do_send(NetworkClientMessages::Block( + block2.clone(), + PeerInfo::random().id, + is_requested, + )); + if is_requested { + let mut block3 = block2.clone(); + block3.mut_header().get_mut().inner_rest.chunk_headers_root = hash(&[1]); + block3.mut_header().get_mut().init(); + client.do_send(NetworkClientMessages::Block( + block3.clone(), + PeerInfo::random().id, + is_requested, + )); + } + future::ready(()) + })); + near_network::test_utils::wait_or_panic(5000); + }) + .unwrap(); +} + +#[test] +fn test_invalid_blocks_not_requested() { + invalid_blocks_common(false); +} + +#[test] +fn test_invalid_blocks_requested() { + invalid_blocks_common(true); +} + #[test] fn invalid_blocks() { init_test_logger(); @@ -375,10 +498,7 @@ fn invalid_blocks() { match msg { NetworkRequests::Block { block } => { assert_eq!(block.header().height(), 1); - assert_eq!( - block.header().prev_state_root(), - &merklize(&vec![MerkleHash::default()]).0 - ); + assert_eq!(block.header().chunk_mask().len(), 1); System::current().stop(); } _ => {} @@ -451,6 +571,140 @@ fn invalid_blocks() { .unwrap(); } +enum InvalidBlockMode { + /// Header is invalid + InvalidHeader, + /// Block is ill-formed (roots check fail) + IllFormed, + /// Block is invalid for other reasons + InvalidBlock, +} +fn ban_peer_for_invalid_block_common(mode: InvalidBlockMode) { + init_test_logger(); + let validators = vec![vec!["test1", "test2", "test3", "test4"]]; + let key_pairs = + vec![PeerInfo::random(), PeerInfo::random(), PeerInfo::random(), PeerInfo::random()]; + let validator_signer1 = InMemoryValidatorSigner::from_seed("test1", KeyType::ED25519, "test1"); + System::run(move || { + let mut ban_counter = 0; + let network_mock: Arc< + RwLock (NetworkResponses, bool)>>, + > = Arc::new(RwLock::new(Box::new(|_: String, _: &NetworkRequests| { + (NetworkResponses::NoResponse, true) + }))); + let (_, conns) = setup_mock_all_validators( + validators.clone(), + key_pairs, + 1, + true, + 100, + false, + false, + 100, + true, + vec![false; validators.iter().map(|x| x.len()).sum()], + network_mock.clone(), + ); + *network_mock.write().unwrap() = + Box::new(move |_: String, msg: &NetworkRequests| -> (NetworkResponses, bool) { + match msg { + NetworkRequests::Block { block } => { + if block.header().height() == 4 { + let mut block_mut = block.clone(); + match mode { + InvalidBlockMode::InvalidHeader => { + // produce an invalid block with invalid header. + block_mut.mut_header().get_mut().inner_rest.chunk_mask = vec![]; + block_mut.mut_header().resign(&validator_signer1); + } + InvalidBlockMode::IllFormed => { + // produce an ill-formed block + block_mut + .mut_header() + .get_mut() + .inner_rest + .chunk_headers_root = hash(&[1]); + block_mut.mut_header().resign(&validator_signer1); + } + InvalidBlockMode::InvalidBlock => { + // produce an invalid block whose invalidity cannot be verified by just + // having its header. + block_mut + .mut_header() + .get_mut() + .inner_rest + .validator_proposals = vec![ValidatorStake { + account_id: "test1".to_string(), + public_key: PublicKey::empty(KeyType::ED25519), + stake: 0, + }]; + block_mut.mut_header().resign(&validator_signer1); + } + } + + for (i, (client, _)) in conns.clone().into_iter().enumerate() { + if i > 0 { + client.do_send(NetworkClientMessages::Block( + block_mut.clone(), + PeerInfo::random().id, + false, + )) + } + } + + return (NetworkResponses::NoResponse, false); + } + if block.header().height() > 20 { + match mode { + InvalidBlockMode::InvalidHeader | InvalidBlockMode::IllFormed => { + assert_eq!(ban_counter, 3); + } + _ => {} + } + System::current().stop(); + } + (NetworkResponses::NoResponse, true) + } + NetworkRequests::BanPeer { peer_id, ban_reason } => match mode { + InvalidBlockMode::InvalidHeader | InvalidBlockMode::IllFormed => { + assert_eq!(ban_reason, &ReasonForBan::BadBlockHeader); + ban_counter += 1; + if ban_counter > 3 { + panic!("more bans than expected"); + } + (NetworkResponses::NoResponse, true) + } + InvalidBlockMode::InvalidBlock => { + panic!("banning peer {:?} unexpectedly for {:?}", peer_id, ban_reason); + } + }, + _ => (NetworkResponses::NoResponse, true), + } + }); + + near_network::test_utils::wait_or_panic(10000); + }) + .unwrap(); +} + +/// If a peer sends a block whose header is valid and passes basic validation, the peer is not banned. +#[test] +fn test_not_ban_peer_for_invalid_block() { + ban_peer_for_invalid_block_common(InvalidBlockMode::InvalidBlock); +} + +/// If a peer sends a block whose header is invalid, we should ban them and do not forward the block +#[test] +fn test_ban_peer_for_invalid_block_header() { + ban_peer_for_invalid_block_common(InvalidBlockMode::InvalidHeader); +} + +/// If a peer sends a block that is ill-formed, we should ban them and do not forward the block +#[test] +fn test_ban_peer_for_ill_formed_block() { + ban_peer_for_invalid_block_common(InvalidBlockMode::IllFormed); +} + /// Runs two validators runtime with only one validator online. /// Present validator produces blocks on it's height after deadline. #[test] @@ -1449,6 +1703,25 @@ fn test_sync_hash_validity() { } } +/// Only process one block per height +#[test] +fn test_not_process_height_twice() { + let mut env = TestEnv::new(ChainGenesis::test(), 1, 1); + let block = env.clients[0].produce_block(1).unwrap().unwrap(); + let mut invalid_block = block.clone(); + env.process_block(0, block, Provenance::PRODUCED); + let validator_signer = InMemoryValidatorSigner::from_seed("test0", KeyType::ED25519, "test0"); + invalid_block.mut_header().get_mut().inner_rest.validator_proposals = vec![ValidatorStake { + account_id: "test1".to_string(), + public_key: PublicKey::empty(KeyType::ED25519), + stake: 0, + }]; + invalid_block.mut_header().resign(&validator_signer); + let (accepted_blocks, res) = env.clients[0].process_block(invalid_block, Provenance::NONE); + assert!(accepted_blocks.is_empty()); + assert!(matches!(res, Ok(None))); +} + #[test] fn test_validate_chunk_extra() { let epoch_length = 5; diff --git a/chain/jsonrpc/client/src/lib.rs b/chain/jsonrpc/client/src/lib.rs index 5ad92777b94..5e47508abfd 100644 --- a/chain/jsonrpc/client/src/lib.rs +++ b/chain/jsonrpc/client/src/lib.rs @@ -183,6 +183,8 @@ jsonrpc_client!(pub struct JsonRpcClient { pub fn broadcast_tx_commit(&self, tx: String) -> RpcRequest; pub fn status(&self) -> RpcRequest; #[allow(non_snake_case)] + pub fn EXPERIMENTAL_check_tx(&self, tx: String) -> RpcRequest; + #[allow(non_snake_case)] pub fn EXPERIMENTAL_genesis_config(&self) -> RpcRequest; pub fn health(&self) -> RpcRequest<()>; pub fn tx(&self, hash: String, account_id: String) -> RpcRequest; diff --git a/chain/jsonrpc/src/lib.rs b/chain/jsonrpc/src/lib.rs index c418319af42..a1fbe5a15a9 100644 --- a/chain/jsonrpc/src/lib.rs +++ b/chain/jsonrpc/src/lib.rs @@ -51,25 +51,6 @@ mod metrics; const JSON_PAYLOAD_MAX_SIZE: usize = 2 * 1024 * 1024; const QUERY_DATA_MAX_SIZE: usize = 10 * 1024; -/// RPCs that should only be called when the node is synced -const RPC_REQUIRES_SYNC: [&'static str; 15] = [ - "broadcast_tx_async", - "EXPERIMENTAL_broadcast_tx_sync", - "broadcast_tx_commit", - "validators", - "query", - "health", - "tx", - "block", - "chunk", - "EXPERIMENTAL_changes", - "EXPERIMENTAL_changes_in_block", - "next_light_client_block", - "EXPERIMENTAL_light_client_proof", - "light_client_proof", - "gas_price", -]; - #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct RpcPollingConfig { pub polling_interval: Duration, @@ -150,7 +131,6 @@ pub enum ServerError { Timeout, Closed, InternalError, - IsSyncing, } impl Display for ServerError { @@ -160,7 +140,6 @@ impl Display for ServerError { ServerError::Timeout => write!(f, "ServerError: Timeout"), ServerError::Closed => write!(f, "ServerError: Closed"), ServerError::InternalError => write!(f, "ServerError: Internal Error"), - ServerError::IsSyncing => write!(f, "ServerError: Node is syncing"), } } } @@ -232,22 +211,11 @@ impl JsonRpcHandler { } } - if RPC_REQUIRES_SYNC.iter().find(|&&s| s == &request.method).is_some() { - match self.client_addr.send(Status { is_health_check: false }).await { - Ok(Ok(result)) => { - if result.sync_info.syncing { - return Err(ServerError::IsSyncing.into()); - } - } - Ok(Err(err)) => return Err(RpcError::new(-32_001, err, None)), - Err(_) => return Err(RpcError::server_error::<()>(None)), - } - } - match request.method.as_ref() { "broadcast_tx_async" => self.send_tx_async(request.params).await, "EXPERIMENTAL_broadcast_tx_sync" => self.send_tx_sync(request.params).await, "broadcast_tx_commit" => self.send_tx_commit(request.params).await, + "EXPERIMENTAL_check_tx" => self.check_tx(request.params).await, "validators" => self.validators(request.params).await, "query" => self.query(request.params).await, "health" => self.health().await, @@ -281,6 +249,39 @@ impl JsonRpcHandler { Ok(Value::String(hash)) } + async fn tx_exists( + &self, + tx_hash: CryptoHash, + signer_account_id: &AccountId, + ) -> Result { + timeout(self.polling_config.polling_timeout, async { + loop { + // TODO(optimization): Introduce a view_client method to only get transaction + // status without the information about execution outcomes. + match self + .view_client_addr + .send(TxStatus { tx_hash, signer_account_id: signer_account_id.clone() }) + .await + { + Ok(Ok(Some(_))) => { + return Ok(true); + } + Ok(Err(TxStatusError::MissingTransaction(_))) => { + return Ok(false); + } + Err(_) => return Err(ServerError::InternalError), + _ => {} + } + delay_for(self.polling_config.polling_interval).await; + } + }) + .await + .map_err(|_| { + near_metrics::inc_counter(&metrics::RPC_TIMEOUT_TOTAL); + ServerError::Timeout + })? + } + async fn tx_status_fetch( &self, tx_info: TransactionInfo, @@ -343,12 +344,17 @@ impl JsonRpcHandler { })? } + /// Send a transaction idempotently (subsequent send of the same transaction will not cause + /// any new side-effects and the result will be the same unless we garbage collected it + /// already). async fn send_tx( &self, tx: SignedTransaction, check_only: bool, ) -> Result { - Ok(self + let tx_hash = tx.get_hash(); + let signer_account_id = tx.transaction.signer_id.clone(); + let response = self .client_addr .send(NetworkClientMessages::Transaction { transaction: tx, @@ -356,13 +362,31 @@ impl JsonRpcHandler { check_only, }) .map_err(|err| RpcError::server_error(Some(ServerError::from(err)))) - .await?) + .await?; + + // If we receive InvalidNonce error, it might be the case that the transaction was + // resubmitted, and we should check if that is the case and return ValidTx response to + // maintain idempotence of the send_tx method. + if let NetworkClientResponses::InvalidTx( + near_primitives::errors::InvalidTxError::InvalidNonce { .. }, + ) = response + { + if self.tx_exists(tx_hash, &signer_account_id).await? { + return Ok(NetworkClientResponses::ValidTx); + } + } + + Ok(response) } async fn send_tx_sync(&self, params: Option) -> Result { self.send_or_check_tx(params, false).await } + async fn check_tx(&self, params: Option) -> Result { + self.send_or_check_tx(params, true).await + } + async fn send_or_check_tx( &self, params: Option, diff --git a/chain/jsonrpc/tests/rpc_transactions.rs b/chain/jsonrpc/tests/rpc_transactions.rs index ed5763a1a32..e30a894ac22 100644 --- a/chain/jsonrpc/tests/rpc_transactions.rs +++ b/chain/jsonrpc/tests/rpc_transactions.rs @@ -201,3 +201,28 @@ fn test_tx_status_missing_tx() { } }); } + +#[test] +fn test_check_invalid_tx() { + test_with_client!(test_utils::NodeType::Validator, client, async move { + let signer = InMemorySigner::from_seed("test1", KeyType::ED25519, "test1"); + // invalid base hash + let tx = SignedTransaction::send_money( + 1, + "test1".to_string(), + "test2".to_string(), + &signer, + 100, + hash(&[1]), + ); + let bytes = tx.try_to_vec().unwrap(); + match client.EXPERIMENTAL_check_tx(to_base64(&bytes)).await { + Err(e) => { + let s = serde_json::to_string(&e.data.unwrap()).unwrap(); + println!("{}", s); + assert_eq!(s, "{\"TxExecutionError\":{\"InvalidTxError\":\"Expired\"}}"); + } + Ok(_) => panic!("transaction should not succeed"), + } + }); +} diff --git a/core/primitives/src/block.rs b/core/primitives/src/block.rs index d86d753b960..ae5abb59d2e 100644 --- a/core/primitives/src/block.rs +++ b/core/primitives/src/block.rs @@ -7,6 +7,10 @@ use num_rational::Rational; use primitive_types::U256; use serde::Serialize; +use crate::block::BlockValidityError::{ + InvalidChallengeRoot, InvalidChunkHeaderRoot, InvalidNumChunksIncluded, InvalidReceiptRoot, + InvalidStateRoot, InvalidTransactionRoot, +}; pub use crate::block_header::*; use crate::challenge::{Challenges, ChallengesResult}; use crate::hash::{hash, CryptoHash}; @@ -27,6 +31,16 @@ pub struct GenesisId { pub hash: CryptoHash, } +#[derive(BorshSerialize, BorshDeserialize, Serialize, Clone, Debug, Eq, PartialEq)] +pub enum BlockValidityError { + InvalidStateRoot, + InvalidReceiptRoot, + InvalidChunkHeaderRoot, + InvalidTransactionRoot, + InvalidNumChunksIncluded, + InvalidChallengeRoot, +} + #[derive(BorshSerialize, BorshDeserialize, Serialize, Debug, Clone, Eq, PartialEq)] pub struct BlockV1 { pub header: BlockHeader, @@ -373,45 +387,45 @@ impl Block { self.header().hash() } - pub fn check_validity(&self) -> bool { + pub fn check_validity(&self) -> Result<(), BlockValidityError> { // Check that state root stored in the header matches the state root of the chunks let state_root = Block::compute_state_root(self.chunks()); if self.header().prev_state_root() != &state_root { - return false; + return Err(InvalidStateRoot); } // Check that chunk receipts root stored in the header matches the state root of the chunks let chunk_receipts_root = Block::compute_chunk_receipts_root(self.chunks()); if self.header().chunk_receipts_root() != &chunk_receipts_root { - return false; + return Err(InvalidReceiptRoot); } // Check that chunk headers root stored in the header matches the chunk headers root of the chunks let chunk_headers_root = Block::compute_chunk_headers_root(self.chunks()).0; if self.header().chunk_headers_root() != &chunk_headers_root { - return false; + return Err(InvalidChunkHeaderRoot); } // Check that chunk tx root stored in the header matches the tx root of the chunks let chunk_tx_root = Block::compute_chunk_tx_root(self.chunks()); if self.header().chunk_tx_root() != &chunk_tx_root { - return false; + return Err(InvalidTransactionRoot); } // Check that chunk included root stored in the header matches the chunk included root of the chunks - let chunks_included_root = + let num_chunks_included = Block::compute_chunks_included(self.chunks(), self.header().height()); - if self.header().chunks_included() != chunks_included_root { - return false; + if self.header().chunks_included() != num_chunks_included { + return Err(InvalidNumChunksIncluded); } // Check that challenges root stored in the header matches the challenges root of the challenges let challenges_root = Block::compute_challenges_root(&self.challenges()); if self.header().challenges_root() != &challenges_root { - return false; + return Err(InvalidChallengeRoot); } - true + Ok(()) } } diff --git a/core/primitives/src/version.rs b/core/primitives/src/version.rs index e5a139fd4f4..eed07d56093 100644 --- a/core/primitives/src/version.rs +++ b/core/primitives/src/version.rs @@ -12,7 +12,7 @@ pub struct Version { pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 4; +pub const DB_VERSION: DbVersion = 5; /// Protocol version type. pub type ProtocolVersion = u32; diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 25a9583a7f7..ef92729c48b 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -99,10 +99,12 @@ pub enum DBCol { /// GC helper column to get all Outcome ids by Block Hash ColOutcomesByBlockHash = 42, ColTransactionRefCount = 43, + /// Heights of blocks that have been processed + ColProcessedBlockHeights = 44, } // Do not move this line from enum DBCol -pub const NUM_COLS: usize = 44; +pub const NUM_COLS: usize = 45; impl std::fmt::Display for DBCol { fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { @@ -151,6 +153,7 @@ impl std::fmt::Display for DBCol { Self::ColGCCount => "gc count", Self::ColOutcomesByBlockHash => "outcomes by block hash", Self::ColTransactionRefCount => "refcount per transaction", + Self::ColProcessedBlockHeights => "processed block heights", }; write!(formatter, "{}", desc) } diff --git a/neard/src/config.rs b/neard/src/config.rs index 5217247dc5b..d6c272b7d6a 100644 --- a/neard/src/config.rs +++ b/neard/src/config.rs @@ -966,9 +966,9 @@ pub fn download_genesis(url: &String, path: &PathBuf) { // In case where the genesis is bigger than the specified limit Overflow Error is thrown let body = response .body() - .limit(250_000_000) + .limit(2_500_000_000) .await - .expect("Genesis file is bigger than 250MB. Please make the limit higher."); + .expect("Genesis file is bigger than 2.5GB. Please make the limit higher."); std::fs::write(&path, &body).expect("Failed to create / write a config file."); diff --git a/neard/src/lib.rs b/neard/src/lib.rs index e6abc2b0322..ff8836f0d01 100644 --- a/neard/src/lib.rs +++ b/neard/src/lib.rs @@ -89,6 +89,13 @@ pub fn apply_store_migrations(path: &String) { fill_col_transaction_refcount(&store); set_store_version(&store, 4); } + if db_version <= 4 { + // version 4 => 5: add ColProcessedBlockHeights + // we don't need to backfill the old heights since at worst we will just process some heights + // again. + let store = create_store(&path); + set_store_version(&store, 5); + } let db_version = get_store_version(path); debug_assert_eq!(db_version, near_primitives::version::DB_VERSION); diff --git a/neard/src/runtime.rs b/neard/src/runtime.rs index 0e10f747911..10b48638feb 100644 --- a/neard/src/runtime.rs +++ b/neard/src/runtime.rs @@ -497,16 +497,6 @@ impl RuntimeAdapter for NightshadeRuntime { self.tries.get_trie_for_shard(shard_id) } - fn verify_block_signature(&self, header: &BlockHeader) -> Result<(), Error> { - let mut epoch_manager = self.epoch_manager.as_ref().write().expect(POISONED_LOCK_ERR); - let validator = - epoch_manager.get_block_producer_info(header.epoch_id(), header.height())?; - if !header.verify_block_producer(&validator.public_key) { - return Err(ErrorKind::InvalidBlockProposer.into()); - } - Ok(()) - } - fn verify_block_vrf( &self, epoch_id: &EpochId, diff --git a/nightly/nightly.txt b/nightly/nightly.txt index 7264394a11c..99464fa151d 100644 --- a/nightly/nightly.txt +++ b/nightly/nightly.txt @@ -38,6 +38,8 @@ pytest --timeout=300 sanity/gc_sync_after_sync.py pytest --timeout=300 sanity/gc_sync_after_sync.py swap_nodes pytest --timeout=300 sanity/large_messages.py pytest sanity/controlled_edge_nonce.py +pytest sanity/repro_2916.py +pytest --timeout=240 sanity/switch_node_key.py # TODO: re-enable after #2949 is fixed # pytest --timeout=240 sanity/validator_switch_key.py pytest sanity/proxy_simple.py @@ -53,7 +55,7 @@ pytest --timeout=240 contracts/gibberish.py # python stress tests # pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart +pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart # pytest --timeout=2000 stress/stress.py 3 2 4 0 staking transactions node_set # pytest stress/network_stress.py diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index fc6db3b808e..cd30d6f165c 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -15,6 +15,7 @@ import retrying import rc from rc import gcloud +import traceback import uuid import network from proxy import NodesProxy @@ -37,6 +38,7 @@ def atexit_cleanup(node): node.cleanup() except: print("Cleaning failed!") + traceback.print_exc() pass @@ -276,6 +278,7 @@ def __init__(self, config_json['network']['addr'] = '0.0.0.0:%s' % port config_json['network']['blacklist'] = blacklist config_json['rpc']['addr'] = '0.0.0.0:%s' % rpc_port + config_json['rpc']['metrics_addr'] = '0.0.0.0:%s' % (rpc_port + 1000) config_json['consensus']['min_num_peers'] = 1 with open(os.path.join(node_dir, "config.json"), 'w') as f: f.write(json.dumps(config_json, indent=2)) @@ -346,10 +349,22 @@ def reset_validator_key(self, new_key): with open(os.path.join(self.node_dir, "validator_key.json"), 'w+') as f: json.dump(new_key.to_json(), f) + def reset_node_key(self, new_key): + self.node_key = new_key + with open(os.path.join(self.node_dir, "node_key.json"), 'w+') as f: + json.dump(new_key.to_json(), f) + def cleanup(self): if self.cleaned: return - self.kill() + + try: + self.kill() + except: + print("Kill failed on cleanup!") + traceback.print_exc() + print("\n\n") + # move the node dir to avoid weird interactions with multiple serial test invocations target_path = self.node_dir + '_finished' if os.path.exists(target_path) and os.path.isdir(target_path): diff --git a/pytest/lib/peer.py b/pytest/lib/peer.py index 579b902f3bb..5c8ce57522f 100644 --- a/pytest/lib/peer.py +++ b/pytest/lib/peer.py @@ -1,4 +1,5 @@ import asyncio +import concurrent import hashlib import struct @@ -6,7 +7,7 @@ from messages import schema from messages.crypto import PublicKey, Signature from messages.network import (EdgeInfo, GenesisId, Handshake, PeerChainInfo, - PeerMessage) + PeerMessage, RoutedMessage, PeerIdOrHash) from serializer import BinarySerializer from nacl.signing import SigningKey from typing import Optional @@ -15,7 +16,9 @@ class Connection: - def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + + def __init__(self, reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): self.reader = reader self.writer = writer self.is_closed = False @@ -30,6 +33,7 @@ async def send_raw(self, raw_message): self.writer.write(raw_message) await self.writer.drain() + # returns None on timeout async def recv(self, expected=None): while True: response_raw = await self.recv_raw() @@ -41,7 +45,8 @@ async def recv(self, expected=None): response = BinarySerializer(schema).deserialize( response_raw, PeerMessage) - if expected is None or response.enum == expected: + if expected is None or response.enum == expected or ( + callable(expected) and expected(response)): return response async def recv_raw(self): @@ -74,7 +79,10 @@ async def connect(addr) -> Connection: return conn -def create_handshake(my_key_pair_nacl, their_pk_serialized, listen_port, version=0): +def create_handshake(my_key_pair_nacl, + their_pk_serialized, + listen_port, + version=0): """ Create handshake message but with placeholders in: - version @@ -137,7 +145,10 @@ def sign_handshake(my_key_pair_nacl, handshake): hashlib.sha256(arr).digest()).signature -async def run_handshake(conn: Connection, target_public_key: PublicKey, key_pair: SigningKey, listen_port=12345): +async def run_handshake(conn: Connection, + target_public_key: PublicKey, + key_pair: SigningKey, + listen_port=12345): handshake = create_handshake(key_pair, target_public_key, listen_port) sign_handshake(key_pair, handshake.Handshake) @@ -154,9 +165,41 @@ async def run_handshake(conn: Connection, target_public_key: PublicKey, key_pair if response.enum == 'HandshakeFailure' and response.HandshakeFailure[1].enum == 'ProtocolVersionMismatch': pvm = response.HandshakeFailure[1].ProtocolVersionMismatch + print(pvm) handshake.Handshake.version = pvm sign_handshake(key_pair, handshake.Handshake) await conn.send(handshake) response = await conn.recv() - assert response.enum == 'Handshake', response.enum + assert response.enum == 'Handshake', response.enum if response.enum != 'HandshakeFailure' else response.HandshakeFailure[1].enum + + +def create_and_sign_routed_peer_message(routed_msg_body, target_node, + my_key_pair_nacl): + routed_msg = RoutedMessage() + routed_msg.target = PeerIdOrHash() + routed_msg.target.enum = 'PeerId' + routed_msg.target.PeerId = PublicKey() + routed_msg.target.PeerId.keyType = 0 + routed_msg.target.PeerId.data = base58.b58decode( + target_node.node_key.pk[len(ED_PREFIX):]) + routed_msg.author = PublicKey() + routed_msg.author.keyType = 0 + routed_msg.author.data = bytes(my_key_pair_nacl.verify_key) + routed_msg.ttl = 100 + routed_msg.body = routed_msg_body + routed_msg.signature = Signature() + routed_msg.signature.keyType = 0 + + routed_msg_arr = bytes( + bytearray([0, 0]) + routed_msg.target.PeerId.data + bytearray([0]) + + routed_msg.author.data + + BinarySerializer(schema).serialize(routed_msg.body)) + routed_msg_hash = hashlib.sha256(routed_msg_arr).digest() + routed_msg.signature.data = my_key_pair_nacl.sign(routed_msg_hash).signature + + peer_message = PeerMessage() + peer_message.enum = 'Routed' + peer_message.Routed = routed_msg + + return peer_message diff --git a/pytest/lib/serializer.py b/pytest/lib/serializer.py index a7a1b804a3f..95ecdc56f88 100644 --- a/pytest/lib/serializer.py +++ b/pytest/lib/serializer.py @@ -113,6 +113,8 @@ def serialize_struct(self, obj): self.serialize_num(idx, 1) self.serialize_field(getattr(obj, fieldName), fieldType) break + else: + assert False, name else: assert False, structSchema diff --git a/pytest/tests/sanity/repro_2916.py b/pytest/tests/sanity/repro_2916.py new file mode 100644 index 00000000000..cb1254cedf7 --- /dev/null +++ b/pytest/tests/sanity/repro_2916.py @@ -0,0 +1,108 @@ +# Spins up two nodes with two shards, waits for couple blocks, snapshots the +# latest chunks, and requests both chunks from the first node, asking for +# receipts for both shards in both requests. We expect the first node to +# respond to exactly one of the requests, for the shard it tracks (for the +# shard it doesn't track it will only have the receipts to the shard it does +# track). +# +# We then kill both nodes, and restart the first node, and do the same +# requests. We expect it to resond the same way. Before 2916 is fixed, it +# fails to respond to the request it was previously responding to due to +# incorrect reconstruction of the receipts. + +import asyncio, sys, time +import socket, base58 +import nacl.signing, hashlib + +sys.path.append('lib') + +from cluster import start_cluster +from peer import * +from utils import obj_to_string + +from messages.tx import * +from messages.block import * +from messages.crypto import * +from messages.network import * + +async def main(): + # start a cluster with two shards + nodes = start_cluster(2, 0, 2, None, [], {}) + + started = time.time() + + while True: + if time.time() - started > 10: + assert False, "Giving up waiting for two blocks" + + status = nodes[0].get_status() + hash_ = status['sync_info']['latest_block_hash'] + height = status['sync_info']['latest_block_height'] + + if height > 2: + block = nodes[0].get_block(hash_) + chunk_hashes = [base58.b58decode(x['chunk_hash']) for x in block['result']['chunks']] + + assert len(chunk_hashes) == 2 + assert all([len(x) == 32 for x in chunk_hashes]) + + break + + my_key_pair_nacl = nacl.signing.SigningKey.generate() + received_responses = [None, None] + +# step = 0: before the node is killed +# step = 1: after the node is killed + for step in range(2): + + conn0 = await connect(nodes[0].addr()) + await run_handshake(conn0, nodes[0].node_key.pk, my_key_pair_nacl) + for shard_ord, chunk_hash in enumerate(chunk_hashes): + + request = PartialEncodedChunkRequestMsg() + request.chunk_hash = chunk_hash + request.part_ords = [] + request.tracking_shards = [0, 1] + + routed_msg_body = RoutedMessageBody() + routed_msg_body.enum = 'PartialEncodedChunkRequest' + routed_msg_body.PartialEncodedChunkRequest = request + + peer_message = create_and_sign_routed_peer_message(routed_msg_body, nodes[0], my_key_pair_nacl) + + await conn0.send(peer_message) + + received_response = False + + def predicate(response): + return response.enum == 'Routed' and response.Routed.body.enum == 'PartialEncodedChunkResponse' + + try: + response = await asyncio.wait_for(conn0.recv(predicate), 5) + except concurrent.futures._base.TimeoutError: + response = None + + if response is not None: + print("Received response for shard %s" % shard_ord) + received_response = True + else: + print("Didn't receive response for shard %s" % shard_ord) + + if step == 0: + received_responses[shard_ord] = received_response + else: + assert received_responses[shard_ord] == received_response, "The response doesn't match for the chunk in shard %s. Received response before node killed: %s, after: %s" % (shard_ord, received_responses[shard_ord], received_response) + + # we expect first node to only respond to one of the chunk requests, for the shard assigned to it + assert received_responses[0] != received_responses[1], received_responses + + if step == 0: + print("Killing and restarting nodes") + nodes[1].kill() + nodes[0].kill() + nodes[0].start(None, None) + time.sleep(1) + + +asyncio.run(main()) + diff --git a/pytest/tests/sanity/rpc_sync.py b/pytest/tests/sanity/rpc_sync.py deleted file mode 100644 index 464735f9c90..00000000000 --- a/pytest/tests/sanity/rpc_sync.py +++ /dev/null @@ -1,37 +0,0 @@ -# test that rpc is properly disabled when a node is syncing - -import sys, time - -sys.path.append('lib') - -from cluster import start_cluster - -TIME_OUT = 100 -TARGET_HEIGHT = 100 - -nodes = start_cluster( - 1, 1, 1, None, - [["epoch_length", 50], - ["block_producer_kickout_threshold", 70]], {1: {"tracked_shards": [0]}}) - -time.sleep(2) -nodes[1].kill() - -status = nodes[0].get_status() -height = status['sync_info']['latest_block_height'] - -started = time.time() -while height < TARGET_HEIGHT: - assert time.time() - started < TIME_OUT - time.sleep(2) - status = nodes[0].get_status() - height = status['sync_info']['latest_block_height'] - -nodes[1].start(nodes[0].node_key.pk, nodes[0].addr()) -time.sleep(2) - -status = nodes[1].get_status() -assert status['sync_info']['syncing'] - -res = nodes[1].json_rpc('block', [20]) -assert 'IsSyncing' in res['error']['data'], res diff --git a/pytest/tests/sanity/switch_node_key.py b/pytest/tests/sanity/switch_node_key.py new file mode 100644 index 00000000000..944c306ab6f --- /dev/null +++ b/pytest/tests/sanity/switch_node_key.py @@ -0,0 +1,57 @@ +# Spin up two validating nodes. Stop one of them after one epoch, switch node key (peer id), and restart. +# Make sure that both node can still produce blocks. + +import sys, time, base58, nacl.bindings + +sys.path.append('lib') + +from cluster import start_cluster, Key + +EPOCH_LENGTH = 40 +TIMEOUT = 50 + +config1 = { + "network": { + "ttl_account_id_router": { + "secs": 1, + "nanos": 0 + }, + } +} +nodes = start_cluster(2, 0, 1, None, [["epoch_length", EPOCH_LENGTH], ["block_producer_kickout_threshold", 30], + ["chunk_producer_kickout_threshold", 30]], {1: config1}) +time.sleep(2) + +status1 = nodes[1].get_status() +height1 = status1['sync_info']['latest_block_height'] +block = nodes[1].get_block(height1) +epoch_id = block['result']['header']['epoch_id'] + +start = time.time() +while True: + assert time.time() - start < TIMEOUT + time.sleep(1) + status1 = nodes[1].get_status() + height1 = status1['sync_info']['latest_block_height'] + cur_block = nodes[1].get_block(height1) + if cur_block['result']['header']['epoch_id'] != epoch_id: + break + +nodes[1].kill() + +seed = bytes([1] * 32) +public_key, secret_key = nacl.bindings.crypto_sign_seed_keypair(seed) +node_key = Key("", base58.b58encode(public_key).decode('utf-8'), base58.b58encode(secret_key).decode('utf-8')) +nodes[1].reset_node_key(node_key) +nodes[1].start(nodes[0].node_key.pk, nodes[0].addr()) +time.sleep(2) + +start = time.time() +while height1 < EPOCH_LENGTH * 2 + 5: + assert time.time() - start < TIMEOUT * 2 + time.sleep(1) + status1 = nodes[1].get_status() + height1 = status1['sync_info']['latest_block_height'] + +validators = nodes[1].get_validators() +assert len(validators['result']['next_validators']) == 2, f'unexpected number of validators, current validators: {status1["validators"]}' diff --git a/pytest/tests/stress/stress.py b/pytest/tests/stress/stress.py index 8af37b24d49..7108c427c2a 100644 --- a/pytest/tests/stress/stress.py +++ b/pytest/tests/stress/stress.py @@ -23,7 +23,7 @@ # `staking2.py` tests some basic stake invariants # This test also completely disables rewards, which simplifies ensuring total supply invariance and balance invariances -import sys, time, base58, random, inspect, traceback, requests +import sys, time, base58, random, inspect, traceback, requests, logging from multiprocessing import Process, Value, Lock sys.path.append('lib') @@ -34,12 +34,17 @@ from network import init_network_pillager, stop_network, resume_network sys.stdout = Unbuffered(sys.stdout) +logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) TIMEOUT = 1500 # after how much time to shut down the test -TIMEOUT_SHUTDOWN = 120 # time to wait after the shutdown was initiated before -MAX_STAKE = int(1e26) +TIMEOUT_SHUTDOWN = 120 # time to wait after the shutdown was initiated before failing the test due to process stalling +MAX_STAKE = int(1e32) EPOCH_LENGTH = 20 +# How many times to try to send transactions to each validator. +# Is only applicable in the scenarios where we expect failures in tx sends. +SEND_TX_ATTEMPTS = 5 + block_timeout = 20 # if two blocks are not produced within that many seconds, the test will fail. The timeout is increased if nodes are restarted or network is being messed up with balances_timeout = 15 # how long to tolerate for balances to update after txs are sent tx_tolerance = 0.1 @@ -68,13 +73,20 @@ def wrapper(stopped, error, *args): return wrapper -def get_recent_hash(node): +def get_recent_hash(node, sync_timeout): # return the parent of the last block known to the observer # don't return the last block itself, since some validators might have not seen it yet # also returns the height of the actual last block (so the height doesn't match the hash!) status = node.get_status() hash_ = status['sync_info']['latest_block_hash'] info = node.json_rpc('block', [hash_]) + + for attempt in range(sync_timeout): + if 'error' in info and info['error']['data'] == 'IsSyncing': + time.sleep(1) + info = node.json_rpc('block', [hash_]) + + assert 'result' in info, info hash_ = info['result']['header']['hash'] return hash_, status['sync_info']['latest_block_height'] @@ -104,7 +116,7 @@ def get_future_time(): if time.time() < change_status_at[i]: continue if nodes_stopped[i]: - print("Node set: starting node %s" % i) + logging.info("Node set: starting node %s" % i) # figuring out a live node with `node_restart` monkey is not trivial # for simplicity just boot from the observer node # `node_restart` doesn't boot from the observer, increasing coverage @@ -115,8 +127,8 @@ def get_future_time(): wipe = False if random.choice([True, False]): wipe = True - #node.reset_data() - print("Node set: stopping%s node %s" % + #node.reset_data() # TODO + logging.info("Node set: stopping%s node %s" % (" and wiping" if wipe else "", i)) nodes_stopped[i] = not nodes_stopped[i] change_status_at[i] = get_future_time() @@ -135,19 +147,23 @@ def monkey_node_restart(stopped, error, nodes, nonces): node = nodes[node_idx] # don't kill the same node too frequently, give it time to reboot and produce something while True: - _, h = get_recent_hash(node) + _, h = get_recent_hash(node, 30) assert h >= heights_after_restart[node_idx], "%s > %s" % ( h, heights_after_restart[node_idx]) if h > heights_after_restart[node_idx]: break time.sleep(1) - print("NUKING NODE %s" % node_idx) + reset_data = random.choice([True, False, False]) + logging.info("NUKING NODE %s%s" % (node_idx, " AND WIPING ITS STORAGE" if reset_data else "")) node.kill() + if reset_data: + #node.reset_data() # TODO + pass node.start(boot_node.node_key.pk, boot_node.addr()) - print("NODE %s IS BACK UP" % node_idx) + logging.info("NODE %s IS BACK UP" % node_idx) - _, heights_after_restart[node_idx] = get_recent_hash(node) + _, heights_after_restart[node_idx] = get_recent_hash(node, 30) time.sleep(5) @@ -184,7 +200,7 @@ def get_balances(): expected_balances = get_balances() min_balances = [x - MAX_STAKE for x in expected_balances] total_supply = (sum(expected_balances)) - print("TOTAL SUPPLY: %s" % total_supply) + logging.info("TOTAL SUPPLY: %s" % total_supply) last_iter_switch = time.time() mode = 0 # 0 = send more tx, 1 = wait for balances @@ -198,13 +214,22 @@ def get_balances(): validator_ids = get_validator_ids(nodes) if time.time() - last_iter_switch > balances_timeout: if mode == 0: - print("%s TRANSACTIONS SENT. WAITING FOR BALANCES" % tx_count) + logging.info("%s TRANSACTIONS SENT. WAITING FOR BALANCES" % tx_count) mode = 1 else: - print( + logging.info( "BALANCES NEVER CAUGHT UP, CHECKING UNFINISHED TRANSACTIONS" ) - snapshot_expected_balances = [x for x in expected_balances] + + def trace_reverted_txs(last_tx_set, tx_ords): + logging.info("\n\nREVERTING THE FOLLOWING TXS WOULD BE ENOUGH:\n") + for tx_ord in tx_ords: + tx = last_tx_set[tx_ord] + logging.info("\nTRANSACTION %s" % tx_ord) + logging.info("TX tuple: %s" % (tx[1:],)) + response = nodes[-1].json_rpc( + 'tx', [tx[3], "test%s" % tx[1]], timeout=1) + logging.info("Status: %s", response) def revert_txs(): nonlocal expected_balances @@ -212,26 +237,20 @@ def revert_txs(): bad = 0 for tx in last_tx_set: tx_happened = True - try: - response = nodes[-1].json_rpc( - 'tx', [tx[3], "test%s" % tx[1]], timeout=1) - - # due to #2195 if the tx was dropped, the query today times out. - if 'error' in response and 'data' in response[ - 'error'] and response['error'][ - 'data'] == 'Timeout': - tx_happened = False - elif 'result' in response and 'receipts_outcome' in response[ - 'result']: - tx_happened = len( - response['result']['receipts_outcome']) > 0 - else: - assert False, response - # This exception handler is also due to #2195 - except requests.exceptions.ReadTimeout: + + response = nodes[-1].json_rpc( + 'tx', [tx[3], "test%s" % tx[1]], timeout=1) + + if 'error' in response and 'data' in response[ + 'error'] and "doesn't exist" in response['error'][ + 'data']: tx_happened = False - except: - raise + elif 'result' in response and 'receipts_outcome' in response[ + 'result']: + tx_happened = len( + response['result']['receipts_outcome']) > 0 + else: + assert False, response if not tx_happened: bad += 1 @@ -244,7 +263,7 @@ def revert_txs(): good, bad = revert_txs() if expected_balances == get_balances(): # reverting helped - print("REVERTING HELPED, TX EXECUTED: %s, TX LOST: %s" % + logging.info("REVERTING HELPED, TX EXECUTED: %s, TX LOST: %s" % (good, bad)) bad_ratio = bad / (good + bad) if bad_ratio > rolling_tolerance: @@ -260,17 +279,46 @@ def revert_txs(): last_tx_set = [] else: # still no match, fail - print( + logging.info( "REVERTING DIDN'T HELP, TX EXECUTED: %s, TX LOST: %s" % (good, bad)) - for step in range( - 10 - ): # trace balances for 20 seconds to see if they are catching up - print(get_balances()) - time.sleep(2) - expected_balances = snapshot_expected_balances - good, bad = revert_txs() - print( + + for i in range(0, len(last_tx_set)): + tx = last_tx_set[i] + expected_balances[tx[1]] += tx[4] + expected_balances[tx[2]] -= tx[4] + + if get_balances() == expected_balances: + trace_reverted_txs(last_tx_set, [i]) + + for j in range(i + 1, len(last_tx_set)): + tx = last_tx_set[j] + expected_balances[tx[1]] += tx[4] + expected_balances[tx[2]] -= tx[4] + + if get_balances() == expected_balances: + trace_reverted_txs(last_tx_set, [i, j]) + + for k in range(j + 1, len(last_tx_set)): + tx = last_tx_set[k] + expected_balances[tx[1]] += tx[4] + expected_balances[tx[2]] -= tx[4] + + if get_balances() == expected_balances: + trace_reverted_txs(last_tx_set, [i, j, k]) + + expected_balances[tx[1]] -= tx[4] + expected_balances[tx[2]] += tx[4] + + tx = last_tx_set[j] + expected_balances[tx[1]] -= tx[4] + expected_balances[tx[2]] += tx[4] + + tx = last_tx_set[i] + expected_balances[tx[1]] -= tx[4] + expected_balances[tx[2]] += tx[4] + + logging.info( "The latest and greatest stats on successful/failed: %s/%s" % (good, bad)) assert False, "Balances didn't update in time. Expected: %s, received: %s" % ( @@ -289,20 +337,44 @@ def revert_txs(): amt = random.randint(0, min_balances[from_]) nonce_val, nonce_lock = nonces[from_] - hash_, _ = get_recent_hash(nodes[-1]) + hash_, _ = get_recent_hash(nodes[-1], 5) with nonce_lock: tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to, amt, nonce_val.value, base58.b58decode(hash_.encode('utf8'))) - for validator_id in validator_ids: - try: - tx_hash = nodes[validator_id].send_tx(tx)['result'] - except (requests.exceptions.ReadTimeout, - requests.exceptions.ConnectionError): - if not network_issues_expected and not nodes[ - validator_id].mess_with: - raise + + # Loop trying to send the tx to all the validators, until at least one receives it + tx_hash = None + for send_attempt in range(SEND_TX_ATTEMPTS): + shuffled_validator_ids = [x for x in validator_ids] + random.shuffle(shuffled_validator_ids) + for validator_id in shuffled_validator_ids: + try: + info = nodes[validator_id].send_tx(tx) + if 'error' in info and info['error']['data'] == 'IsSyncing': + pass + + elif 'result' in info: + tx_hash = info['result'] + break + + else: + assert False, info + + except (requests.exceptions.ReadTimeout, + requests.exceptions.ConnectionError): + if not network_issues_expected and not nodes[ + validator_id].mess_with: + raise + + if tx_hash is not None: + break + + time.sleep(1) + + else: + assert False, "Failed to send the transation after %s attempts" % SEND_TX_ATTEMPTS last_tx_set.append((tx, from_, to, tx_hash, amt)) nonce_val.value = nonce_val.value + 1 @@ -315,7 +387,7 @@ def revert_txs(): else: if get_balances() == expected_balances: - print("BALANCES CAUGHT UP, BACK TO SPAMMING TXS") + logging.info("BALANCES CAUGHT UP, BACK TO SPAMMING TXS") min_balances = [x - MAX_STAKE for x in expected_balances] tx_count = 0 mode = 0 @@ -329,7 +401,7 @@ def revert_txs(): def get_the_guy_to_mess_up_with(nodes): - _, height = get_recent_hash(nodes[-1]) + _, height = get_recent_hash(nodes[-1], 5) return (height // EPOCH_LENGTH) % (len(nodes) - 1) @@ -340,7 +412,7 @@ def monkey_staking(stopped, error, nodes, nonces): whom = random.randint(0, len(nonces) - 2) status = nodes[-1].get_status() - hash_, _ = get_recent_hash(nodes[-1]) + hash_, _ = get_recent_hash(nodes[-1], 5) who_can_unstake = get_the_guy_to_mess_up_with(nodes) @@ -389,7 +461,7 @@ def blocks_tracker(stopped, error, nodes, nonces): status = nodes[val_id].get_status() if status['validators'] != last_validators and val_id == -1: last_validators = status['validators'] - print( + logging.info( "VALIDATORS TRACKER: validators set changed, new set: %s" % [x['account_id'] for x in last_validators]) hash_ = status['sync_info']['latest_block_hash'] @@ -399,11 +471,11 @@ def blocks_tracker(stopped, error, nodes, nonces): if stopped.value != 0: done = True if not every_ten or largest_height % 10 == 0: - print("BLOCK TRACKER: new height %s" % largest_height) + logging.info("BLOCK TRACKER: new height %s" % largest_height) if largest_height >= 20: if not every_ten: every_ten = True - print( + logging.info( "BLOCK TRACKER: switching to tracing every ten blocks to reduce spam" ) largest_height = height @@ -456,10 +528,10 @@ def blocks_tracker(stopped, error, nodes, nonces): if divergence > largest_divergence: largest_divergence = divergence - print("=== BLOCK TRACKER SUMMARY ===") - print("Largest height: %s" % largest_height) - print("Largest divergence: %s" % largest_divergence) - print("Per node: %s" % largest_per_node) + logging.info("=== BLOCK TRACKER SUMMARY ===") + logging.info("Largest height: %s" % largest_height) + logging.info("Largest divergence: %s" % largest_divergence) + logging.info("Per node: %s" % largest_per_node) if not network_issues_expected: assert largest_divergence < len(nodes) @@ -483,7 +555,8 @@ def doit(s, n, N, k, monkeys, timeout): N, k + 1, s, config, [["min_gas_price", 0], ["max_inflation_rate", [0, 1]], ["epoch_length", EPOCH_LENGTH], - ["block_producer_kickout_threshold", 70]], local_config_changes) + ["block_producer_kickout_threshold", 10], + ["chunk_producer_kickout_threshold", 10]], local_config_changes) started = time.time() @@ -502,9 +575,9 @@ def doit(s, n, N, k, monkeys, timeout): node.mess_with = False monkey_names = [x.__name__ for x in monkeys] - print(monkey_names) + logging.info(monkey_names) if 'monkey_local_network' in monkey_names or 'monkey_global_network' in monkey_names: - print( + logging.info( "There are monkeys messing up with network, initializing the infra") if config['local']: init_network_pillager() @@ -547,10 +620,10 @@ def check_errors(): check_errors() time.sleep(1) - print("") - print("==========================================") - print("# TIMEOUT IS HIT, SHUTTING DOWN THE TEST #") - print("==========================================") + logging.info("") + logging.info("==========================================") + logging.info("# TIMEOUT IS HIT, SHUTTING DOWN THE TEST #") + logging.info("==========================================") stopped.value = 1 started_shutdown = time.time() while True: diff --git a/runtime/near-vm-runner-standalone/README.md b/runtime/near-vm-runner-standalone/README.md index b2f071a753e..0a01efdc204 100644 --- a/runtime/near-vm-runner-standalone/README.md +++ b/runtime/near-vm-runner-standalone/README.md @@ -6,3 +6,10 @@ and the all effects of computing the execution result of a smart contract are en One can use `near-vm-runner-standalone` to test the smart contracts, e.g. with integration tests to make sure it has expected behavior once deployed to the blockchain. + +To use run like this: + + cargo run -- --wasm-file ./status_message.wasm --method-name set_status --input '{"message": "12345"}' + сargo run -- --wasm-file ./status_message.wasm --method-name get_status --input '{"account_id": "bob"}' --state '{"U1RBVEU=":"AQAAAAMAAABib2IFAAAAMTIzNDU="}' + +I.e. persistent state could be passed across runs via `--state` parameter. diff --git a/runtime/near-vm-runner/src/wasmer_runner.rs b/runtime/near-vm-runner/src/wasmer_runner.rs index da4f970ec47..6bba6a1d3cf 100644 --- a/runtime/near-vm-runner/src/wasmer_runner.rs +++ b/runtime/near-vm-runner/src/wasmer_runner.rs @@ -189,6 +189,9 @@ pub fn run_wasmer<'a>( "Execution of smart contracts is only supported for x86 and x86_64 CPU architectures." ); } + if !is_x86_feature_detected!("avx") { + panic!("AVX support is required in order to run Wasmer VM Singlepass backend."); + } if method_name.is_empty() { return ( None, diff --git a/scripts/docker-release.sh b/scripts/docker-release.sh index beb034571fa..01056492249 100755 --- a/scripts/docker-release.sh +++ b/scripts/docker-release.sh @@ -1,7 +1,8 @@ #!/bin/bash set -euo pipefail -branch=${BUILDKITE_BRANCH/:/_} +branch=${BUILDKITE_BRANCH//:/_} +branch=${branch//\//_} commit=${BUILDKITE_COMMIT} if [[ ${commit} == "HEAD" ]]; then commit=$(git rev-parse HEAD) diff --git a/test-utils/state-viewer/src/main.rs b/test-utils/state-viewer/src/main.rs index 249585bde32..9e5c2b721df 100644 --- a/test-utils/state-viewer/src/main.rs +++ b/test-utils/state-viewer/src/main.rs @@ -5,15 +5,16 @@ use std::sync::Arc; use ansi_term::Color::Red; use clap::{App, Arg, SubCommand}; -use near_chain::types::BlockHeaderInfo; -use near_chain::{ChainStore, ChainStoreAccess, RuntimeAdapter}; +use near_chain::chain::collect_receipts_from_response; +use near_chain::types::{ApplyTransactionResult, BlockHeaderInfo}; +use near_chain::{ChainStore, ChainStoreAccess, ChainStoreUpdate, RuntimeAdapter}; use near_logger_utils::init_integration_logger; use near_network::peer_store::PeerStore; use near_primitives::block::BlockHeader; use near_primitives::hash::CryptoHash; use near_primitives::serialize::to_base; use near_primitives::state_record::StateRecord; -use near_primitives::types::{BlockHeight, StateRoot}; +use near_primitives::types::{BlockHeight, ChunkExtra, ShardId, StateRoot}; use near_store::test_utils::create_test_store; use near_store::{create_store, Store, TrieIterator}; use neard::{get_default_home, get_store_path, load_config, NearConfig, NightshadeRuntime}; @@ -21,19 +22,29 @@ use state_dump::state_dump; mod state_dump; +#[allow(unused)] +enum LoadTrieMode { + /// Load latest state + Latest, + /// Load prev state at some height + Height(BlockHeight), + /// Load the prev state of the last final block from some height + LastFinalFromHeight(BlockHeight), +} + fn load_trie( store: Arc, home_dir: &Path, near_config: &NearConfig, ) -> (NightshadeRuntime, Vec, BlockHeader) { - load_trie_stop_at_height(store, home_dir, near_config, None) + load_trie_stop_at_height(store, home_dir, near_config, LoadTrieMode::Latest) } fn load_trie_stop_at_height( store: Arc, home_dir: &Path, near_config: &NearConfig, - stop_height: Option, + mode: LoadTrieMode, ) -> (NightshadeRuntime, Vec, BlockHeader) { let mut chain_store = ChainStore::new(store.clone(), near_config.genesis.config.genesis_height); @@ -45,8 +56,8 @@ fn load_trie_stop_at_height( near_config.client_config.tracked_shards.clone(), ); let head = chain_store.head().unwrap(); - let last_block = match stop_height { - Some(height) => { + let last_block = match mode { + LoadTrieMode::LastFinalFromHeight(height) => { // find the first final block whose height is at least `height`. let mut cur_height = height + 1; loop { @@ -71,7 +82,11 @@ fn load_trie_stop_at_height( } } } - None => chain_store.get_block(&head.last_block_hash).unwrap().clone(), + LoadTrieMode::Height(height) => { + let block_hash = chain_store.get_block_hash_by_height(height).unwrap(); + chain_store.get_block(&block_hash).unwrap().clone() + } + LoadTrieMode::Latest => chain_store.get_block(&head.last_block_hash).unwrap().clone(), }; let state_roots = last_block.chunks().iter().map(|chunk| chunk.inner.prev_state_root).collect(); (runtime, state_roots, last_block.header().clone()) @@ -178,6 +193,140 @@ fn replay_chain( } } +fn apply_block_at_height( + store: Arc, + home_dir: &Path, + near_config: &NearConfig, + height: BlockHeight, + shard_id: ShardId, +) { + let mut chain_store = ChainStore::new(store.clone(), near_config.genesis.config.genesis_height); + let runtime = NightshadeRuntime::new( + &home_dir, + store, + Arc::clone(&near_config.genesis), + near_config.client_config.tracked_accounts.clone(), + near_config.client_config.tracked_shards.clone(), + ); + let block_hash = chain_store.get_block_hash_by_height(height).unwrap(); + let block = chain_store.get_block(&block_hash).unwrap().clone(); + assert_eq!(block.chunks()[shard_id as usize].height_included, height); + let chunk = + chain_store.get_chunk(&block.chunks()[shard_id as usize].chunk_hash()).unwrap().clone(); + let prev_block = chain_store.get_block(&block.header().prev_hash()).unwrap().clone(); + let mut chain_store_update = ChainStoreUpdate::new(&mut chain_store); + let receipt_proof_response = chain_store_update + .get_incoming_receipts_for_shard( + shard_id, + block_hash, + prev_block.chunks()[shard_id as usize].height_included, + ) + .unwrap(); + let receipts = collect_receipts_from_response(&receipt_proof_response); + + let apply_result = runtime + .apply_transactions( + shard_id, + &chunk.header.inner.prev_state_root, + height, + block.header().raw_timestamp(), + block.header().prev_hash(), + block.hash(), + &receipts, + &chunk.transactions, + &chunk.header.inner.validator_proposals, + prev_block.header().gas_price(), + chunk.header.inner.gas_limit, + &block.header().challenges_result(), + ) + .unwrap(); + let (outcome_root, _) = ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes); + let chunk_extra = ChunkExtra::new( + &apply_result.new_root, + outcome_root, + apply_result.validator_proposals, + apply_result.total_gas_burnt, + chunk.header.inner.gas_limit, + apply_result.total_balance_burnt, + ); + + println!( + "apply chunk for shard {} at height {}, resulting chunk extra {:?}", + shard_id, height, chunk_extra + ); + if let Ok(chunk_extra) = chain_store.get_chunk_extra(&block_hash, shard_id) { + println!("Existing chunk extra: {:?}", chunk_extra); + } else { + println!("no existing chunk extra available"); + } +} + +fn view_chain( + store: Arc, + near_config: &NearConfig, + height: Option, + view_block: bool, + view_chunks: bool, +) { + let mut chain_store = ChainStore::new(store.clone(), near_config.genesis.config.genesis_height); + let block = { + match height { + Some(h) => { + let block_hash = + chain_store.get_block_hash_by_height(h).expect("Block does not exist"); + chain_store.get_block(&block_hash).unwrap().clone() + } + None => { + let head = chain_store.head().unwrap(); + chain_store.get_block(&head.last_block_hash).unwrap().clone() + } + } + }; + + let mut chunk_extras = vec![]; + let mut chunks = vec![]; + for (i, chunk_header) in block.chunks().iter().enumerate() { + if chunk_header.height_included == block.header().height() { + chunk_extras.push(( + i, + chain_store.get_chunk_extra(&block.hash(), i as ShardId).unwrap().clone(), + )); + chunks.push((i, chain_store.get_chunk(&chunk_header.hash).unwrap().clone())); + } + } + let chunk_extras = block + .chunks() + .iter() + .enumerate() + .filter_map(|(i, chunk_header)| { + if chunk_header.height_included == block.header().height() { + Some((i, chain_store.get_chunk_extra(&block.hash(), i as ShardId).unwrap().clone())) + } else { + None + } + }) + .collect::>(); + + if height.is_none() { + let head = chain_store.head().unwrap(); + println!("head: {:?}", head); + } else { + println!("block height {}, hash {}", block.header().height(), block.hash()); + } + + for (shard_id, chunk_extra) in chunk_extras { + println!("shard {}, chunk extra: {:?}", shard_id, chunk_extra); + } + if view_block { + println!("last block: {:?}", block); + } + if view_chunks { + for (shard_id, chunk) in chunks { + println!("shard {}, chunk: {:?}", shard_id, chunk); + } + } +} + fn main() { init_integration_logger(); @@ -236,6 +385,45 @@ fn main() { ) .help("replay headers from chain"), ) + .subcommand( + SubCommand::with_name("apply") + .arg( + Arg::with_name("height") + .long("height") + .required(true) + .help("Height of the block to apply") + .takes_value(true), + ) + .arg( + Arg::with_name("shard_id") + .long("shard_id") + .help("Id of the shard to apply") + .takes_value(true), + ) + .help("apply block at some height for shard"), + ) + .subcommand( + SubCommand::with_name("view_chain") + .arg( + Arg::with_name("height") + .long("height") + .help("height of the block") + .takes_value(true), + ) + .arg( + Arg::with_name("block") + .long("block") + .help("Whether to print the last block") + .takes_value(false), + ) + .arg( + Arg::with_name("chunk") + .long("chunk") + .help("Whether to print the last chunks") + .takes_value(false), + ) + .help("View head of the storage"), + ) .get_matches(); let home_dir = matches.value_of("home").map(|dir| Path::new(dir)).unwrap(); @@ -266,8 +454,12 @@ fn main() { } ("dump_state", Some(args)) => { let height = args.value_of("height").map(|s| s.parse::().unwrap()); + let mode = match height { + Some(h) => LoadTrieMode::LastFinalFromHeight(h), + None => LoadTrieMode::Latest, + }; let (runtime, state_roots, header) = - load_trie_stop_at_height(store, home_dir, &near_config, height); + load_trie_stop_at_height(store, home_dir, &near_config, mode); let height = header.height(); let home_dir = PathBuf::from(&home_dir); @@ -295,6 +487,18 @@ fn main() { let end_index = args.value_of("end_index").map(|s| s.parse::().unwrap()).unwrap(); replay_chain(store, home_dir, &near_config, start_index, end_index); } + ("apply", Some(args)) => { + let height = args.value_of("height").map(|s| s.parse::().unwrap()).unwrap(); + let shard_id = + args.value_of("shard_id").map(|s| s.parse::().unwrap()).unwrap_or_default(); + apply_block_at_height(store, home_dir, &near_config, height, shard_id); + } + ("view_chain", Some(args)) => { + let height = args.value_of("height").map(|s| s.parse::().unwrap()); + let view_block = args.is_present("block"); + let view_chunks = args.is_present("chunk"); + view_chain(store, &near_config, height, view_block, view_chunks); + } (_, _) => unreachable!(), } }