Skip to content

Commit

Permalink
Refcounted DB columns
Browse files Browse the repository at this point in the history
- Move refcount logic for ColState from Trie to Store
- Change refcount for ColState 4 byte to 8 byte
- Use the new logic for ColTransactions, deprecate ColTransactionRefCount
- Use it for ColReceiptIdToShardId

This requires a storage version upgrade.

Fixes #3169

Test plan
---------
TODO
  • Loading branch information
mikhailOK committed Aug 19, 2020
1 parent fc300da commit 9e35666
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 234 deletions.
70 changes: 11 additions & 59 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use near_store::{
ColInvalidChunks, ColLastBlockWithNewChunk, ColNextBlockHashes, ColNextBlockWithNewChunk,
ColOutcomesByBlockHash, ColOutgoingReceipts, ColPartialChunks, ColProcessedBlockHeights,
ColReceiptIdToShardId, ColState, ColStateChanges, ColStateDlInfos, ColStateHeaders,
ColStateParts, ColTransactionRefCount, ColTransactionResult, ColTransactions, ColTrieChanges,
DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges,
ColStateParts, ColTransactionResult, ColTransactions, ColTrieChanges, DBCol,
KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges,
CHUNK_TAIL_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY,
LATEST_KNOWN_KEY, SHOULD_COL_GC, SYNC_HEAD_KEY, TAIL_KEY,
};
Expand Down Expand Up @@ -162,8 +162,6 @@ pub trait ChainStoreAccess {
&mut self,
height: BlockHeight,
) -> Result<HashSet<ChunkHash>, Error>;
/// Returns a number of references for Transaction with `tx_hash`
fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result<u64, Error>;
/// Returns a number of references for Block with `block_hash`
fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error>;
/// Check if we saw chunk hash at given height and shard id.
Expand Down Expand Up @@ -743,14 +741,6 @@ impl ChainStoreAccess for ChainStore {
}
}

fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result<u64, Error> {
match self.store.get_ser(ColTransactionRefCount, tx_hash.as_ref()) {
Ok(Some(value)) => Ok(value),
Ok(None) => Ok(0),
Err(e) => Err(e.into()),
}
}

fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error> {
option_to_not_found(
read_with_cache(
Expand Down Expand Up @@ -1124,7 +1114,6 @@ struct ChainStoreCacheUpdate {
next_block_with_new_chunk: HashMap<(CryptoHash, ShardId), CryptoHash>,
last_block_with_new_chunk: HashMap<ShardId, CryptoHash>,
transactions: HashSet<SignedTransaction>,
tx_refcounts: HashMap<CryptoHash, u64>,
block_refcounts: HashMap<CryptoHash, u64>,
block_merkle_tree: HashMap<CryptoHash, PartialMerkleTree>,
block_ordinal_to_hash: HashMap<NumBlocks, CryptoHash>,
Expand Down Expand Up @@ -1372,14 +1361,6 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
self.chain_store.get_all_chunk_hashes_by_height(height)
}

fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result<u64, Error> {
if let Some(refcount) = self.chain_store_cache_update.tx_refcounts.get(tx_hash) {
Ok(*refcount)
} else {
self.chain_store.get_tx_refcount(tx_hash)
}
}

fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error> {
if let Some(refcount) = self.chain_store_cache_update.block_refcounts.get(block_hash) {
Ok(refcount)
Expand Down Expand Up @@ -1994,7 +1975,7 @@ impl<'a> ChainStoreUpdate<'a> {
self.gc_col(ColReceiptIdToShardId, &receipt.receipt_id.into());
}
for transaction in chunk.transactions {
self.gc_col_transaction(transaction.get_hash())?;
self.gc_col(ColTransactions, &transaction.get_hash().into());
}

// 2. Delete chunk_hash-indexed data
Expand Down Expand Up @@ -2225,23 +2206,6 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

pub fn gc_col_transaction(&mut self, tx_hash: CryptoHash) -> Result<(), Error> {
let mut refcount = self.get_tx_refcount(&tx_hash)?;
if refcount == 0 {
debug_assert!(false, "ColTransactionRefCount inconsistency");
return Err(
ErrorKind::GCError("ColTransactionRefCount inconsistency".to_string()).into()
);
}
refcount -= 1;
self.chain_store_cache_update.tx_refcounts.insert(tx_hash, refcount);
if refcount == 0 {
self.gc_col(ColTransactionRefCount, &tx_hash.into());
self.gc_col(ColTransactions, &tx_hash.into());
}
Ok(())
}

fn gc_col(&mut self, col: DBCol, key: &Vec<u8>) {
assert!(SHOULD_COL_GC[col as usize]);
let mut store_update = self.store().store_update();
Expand Down Expand Up @@ -2291,11 +2255,11 @@ impl<'a> ChainStoreUpdate<'a> {
self.chain_store.block_refcounts.cache_remove(key);
}
DBCol::ColReceiptIdToShardId => {
store_update.delete(col, key);
store_update.update_refcount(col, key, &[], -1);
self.chain_store.receipt_id_to_shard_id.cache_remove(key);
}
DBCol::ColTransactions => {
store_update.delete(col, key);
store_update.update_refcount(col, key, &[], -1);
self.chain_store.transactions.cache_remove(key);
}
DBCol::ColChunks => {
Expand Down Expand Up @@ -2347,9 +2311,6 @@ impl<'a> ChainStoreUpdate<'a> {
store_update.delete(col, key);
self.chain_store.last_block_with_new_chunk.cache_remove(key);
}
DBCol::ColTransactionRefCount => {
store_update.delete(col, key);
}
DBCol::ColProcessedBlockHeights => {
store_update.delete(col, key);
self.chain_store.processed_block_heights.cache_remove(key);
Expand All @@ -2368,7 +2329,8 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::ColComponentEdges
| DBCol::ColEpochInfo
| DBCol::ColEpochStart
| DBCol::ColBlockOrdinal => {
| DBCol::ColBlockOrdinal
| DBCol::_ColTransactionRefCount => {
unreachable!();
}
}
Expand Down Expand Up @@ -2471,9 +2433,8 @@ impl<'a> ChainStoreUpdate<'a> {

// Increase transaction refcounts for all included txs
for tx in chunk.transactions.iter() {
let mut refcount = self.chain_store.get_tx_refcount(&tx.get_hash())?;
refcount += 1;
store_update.set_ser(ColTransactionRefCount, tx.get_hash().as_ref(), &refcount)?;
let bytes = tx.try_to_vec().expect("Borsh cannot fail");
store_update.update_refcount(ColTransactions, tx.get_hash().as_ref(), &bytes, 1)
}

store_update.set_ser(ColChunks, chunk_hash.as_ref(), chunk)?;
Expand Down Expand Up @@ -2542,7 +2503,8 @@ impl<'a> ChainStoreUpdate<'a> {
store_update.set_ser(ColOutcomesByBlockHash, block_hash.as_ref(), &hash_set)?;
}
for (receipt_id, shard_id) in self.chain_store_cache_update.receipt_id_to_shard_id.iter() {
store_update.set_ser(ColReceiptIdToShardId, receipt_id.as_ref(), shard_id)?;
let data = shard_id.try_to_vec()?;
store_update.update_refcount(ColReceiptIdToShardId, receipt_id.as_ref(), &data, 1);
}
for ((block_hash, shard_id), next_block_hash) in
self.chain_store_cache_update.next_block_with_new_chunk.iter()
Expand All @@ -2561,16 +2523,6 @@ impl<'a> ChainStoreUpdate<'a> {
block_hash,
)?;
}
for transaction in self.chain_store_cache_update.transactions.iter() {
store_update.set_ser(ColTransactions, transaction.get_hash().as_ref(), transaction)?;
}
for (tx_hash, refcount) in self.chain_store_cache_update.tx_refcounts.drain() {
// tx_refcounts cache is used in GC only.
// While increasing, we write to the storage directly because we add no transaction twice.
if refcount > 0 {
store_update.set_ser(ColTransactionRefCount, &tx_hash.as_ref(), &refcount)?;
}
}
for (block_hash, refcount) in self.chain_store_cache_update.block_refcounts.iter() {
store_update.set_ser(ColBlockRefCount, block_hash.as_ref(), refcount)?;
}
Expand Down
14 changes: 8 additions & 6 deletions chain/chain/src/store_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use near_primitives::syncing::{ShardStateSyncResponseHeader, StateHeaderKey, Sta
use near_primitives::transaction::ExecutionOutcomeWithIdAndProof;
use near_primitives::types::{AccountId, BlockHeight, EpochId, GCCount, ShardId};
use near_primitives::utils::get_block_shard_id_rev;
use near_store::{DBCol, Store, TrieChanges, NUM_COLS, SHOULD_COL_GC, SKIP_COL_GC};
use near_store::{
decode_value_with_rc, DBCol, Store, TrieChanges, NUM_COLS, SHOULD_COL_GC, SKIP_COL_GC,
};
use validate::StoreValidatorError;

use crate::RuntimeAdapter;
Expand Down Expand Up @@ -128,7 +130,7 @@ impl StoreValidator {
self.errors.push(ErrorMessage { key: to_string(&key), col: to_string(&col), err })
}
fn validate_col(&mut self, col: DBCol) -> Result<(), StoreValidatorError> {
for (key, value) in self.store.clone().iter(col) {
for (key, value) in self.store.clone().iter_without_rc_logic(col) {
let key_ref = key.as_ref();
let value_ref = value.as_ref();
match col {
Expand Down Expand Up @@ -277,10 +279,10 @@ impl StoreValidator {
let count = GCCount::try_from_slice(value_ref)?;
self.check(&validate::gc_col_count, &col, &count, col);
}
DBCol::ColTransactionRefCount => {
DBCol::ColTransactions => {
let (_value, rc) = decode_value_with_rc(value_ref);
let tx_hash = CryptoHash::try_from(key_ref)?;
let refcount = u64::try_from_slice(value_ref)?;
self.check(&validate::tx_refcount, &tx_hash, &refcount, col);
self.check(&validate::tx_refcount, &tx_hash, &(rc as u64), col);
}
DBCol::ColBlockRefCount => {
let block_hash = CryptoHash::try_from(key_ref)?;
Expand Down Expand Up @@ -340,7 +342,7 @@ impl StoreValidator {
}
// Check that all refs are counted
if let Err(e) = validate::tx_refcount_final(self) {
self.process_error(e, "TX_REFCOUNT", DBCol::ColTransactionRefCount)
self.process_error(e, "TX_REFCOUNT", DBCol::ColTransactions)
}
// Check that all Block Refcounts are counted
if let Err(e) = validate::block_refcount_final(self) {
Expand Down
14 changes: 6 additions & 8 deletions chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,15 +714,13 @@ pub(crate) fn tx_refcount(
tx_hash: &CryptoHash,
refcount: &u64,
) -> Result<(), StoreValidatorError> {
if let Some(expected) = sv.inner.tx_refcount.get(tx_hash) {
if refcount != expected {
err!("Invalid tx refcount, expected {:?}, found {:?}", expected, refcount)
} else {
sv.inner.tx_refcount.remove(tx_hash);
return Ok(());
}
let expected = sv.inner.tx_refcount.get(tx_hash).map(|&rc| rc).unwrap_or_default();
if *refcount != expected {
err!("Invalid tx refcount, expected {:?}, found {:?}", expected, refcount)
} else {
sv.inner.tx_refcount.remove(tx_hash);
return Ok(());
}
err!("Unexpected Tx found")
}

pub(crate) fn block_refcount(
Expand Down
2 changes: 1 addition & 1 deletion core/primitives/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Version {
pub type DbVersion = u32;

/// Current version of the database.
pub const DB_VERSION: DbVersion = 6;
pub const DB_VERSION: DbVersion = 7;

/// Protocol version type.
pub type ProtocolVersion = u32;
Expand Down
Loading

0 comments on commit 9e35666

Please sign in to comment.