From a0d055404331911c1175777a5491a1e212b6254a Mon Sep 17 00:00:00 2001 From: Mikhail Date: Mon, 17 Aug 2020 14:20:52 -0700 Subject: [PATCH] Refcounted DB columns - Move refcount logic for ColState from Trie to Store - Change refcount for ColState 4 byte to 8 byte - Use the new logic for ColTransactions, deprecate ColTransactionRefCount - Use it for ColReceiptIdToShardId This requires a storage version upgrade. Fixes #3169 Test plan --------- TODO --- chain/chain/src/store.rs | 70 ++------- chain/chain/src/store_validator.rs | 14 +- chain/chain/src/store_validator/validate.rs | 14 +- core/store/src/db.rs | 155 ++++++++++---------- core/store/src/db/refcount.rs | 136 +++++++++++++++++ core/store/src/db/v6.rs | 85 +++++++++++ core/store/src/lib.rs | 48 +++--- core/store/src/migrations.rs | 26 +++- core/store/src/migrations/v6_to_v7.rs | 54 +++++++ core/store/src/trie/mod.rs | 19 --- core/store/src/trie/shard_tries.rs | 7 +- core/store/src/trie/trie_storage.rs | 45 +----- core/store/src/validate.rs | 1 - neard/src/lib.rs | 10 +- 14 files changed, 448 insertions(+), 236 deletions(-) create mode 100644 core/store/src/db/refcount.rs create mode 100644 core/store/src/db/v6.rs create mode 100644 core/store/src/migrations/v6_to_v7.rs delete mode 100644 core/store/src/validate.rs diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 63740f10d18..15605fe42c7 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -40,8 +40,8 @@ use near_store::{ ColInvalidChunks, ColLastBlockWithNewChunk, ColNextBlockHashes, ColNextBlockWithNewChunk, ColOutcomesByBlockHash, ColOutgoingReceipts, ColPartialChunks, ColProcessedBlockHeights, ColReceiptIdToShardId, ColState, ColStateChanges, ColStateDlInfos, ColStateHeaders, - ColStateParts, ColTransactionRefCount, ColTransactionResult, ColTransactions, ColTrieChanges, - DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges, + ColStateParts, ColTransactionResult, ColTransactions, ColTrieChanges, DBCol, + KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges, CHUNK_TAIL_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, SHOULD_COL_GC, SYNC_HEAD_KEY, TAIL_KEY, }; @@ -162,8 +162,6 @@ pub trait ChainStoreAccess { &mut self, height: BlockHeight, ) -> Result, Error>; - /// Returns a number of references for Transaction with `tx_hash` - fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result; /// Returns a number of references for Block with `block_hash` fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error>; /// Check if we saw chunk hash at given height and shard id. @@ -743,14 +741,6 @@ impl ChainStoreAccess for ChainStore { } } - fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result { - match self.store.get_ser(ColTransactionRefCount, tx_hash.as_ref()) { - Ok(Some(value)) => Ok(value), - Ok(None) => Ok(0), - Err(e) => Err(e.into()), - } - } - fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error> { option_to_not_found( read_with_cache( @@ -1124,7 +1114,6 @@ struct ChainStoreCacheUpdate { next_block_with_new_chunk: HashMap<(CryptoHash, ShardId), CryptoHash>, last_block_with_new_chunk: HashMap, transactions: HashSet, - tx_refcounts: HashMap, block_refcounts: HashMap, block_merkle_tree: HashMap, block_ordinal_to_hash: HashMap, @@ -1372,14 +1361,6 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { self.chain_store.get_all_chunk_hashes_by_height(height) } - fn get_tx_refcount(&mut self, tx_hash: &CryptoHash) -> Result { - if let Some(refcount) = self.chain_store_cache_update.tx_refcounts.get(tx_hash) { - Ok(*refcount) - } else { - self.chain_store.get_tx_refcount(tx_hash) - } - } - fn get_block_refcount(&mut self, block_hash: &CryptoHash) -> Result<&u64, Error> { if let Some(refcount) = self.chain_store_cache_update.block_refcounts.get(block_hash) { Ok(refcount) @@ -1994,7 +1975,7 @@ impl<'a> ChainStoreUpdate<'a> { self.gc_col(ColReceiptIdToShardId, &receipt.receipt_id.into()); } for transaction in chunk.transactions { - self.gc_col_transaction(transaction.get_hash())?; + self.gc_col(ColTransactions, &transaction.get_hash().into()); } // 2. Delete chunk_hash-indexed data @@ -2225,23 +2206,6 @@ impl<'a> ChainStoreUpdate<'a> { Ok(()) } - pub fn gc_col_transaction(&mut self, tx_hash: CryptoHash) -> Result<(), Error> { - let mut refcount = self.get_tx_refcount(&tx_hash)?; - if refcount == 0 { - debug_assert!(false, "ColTransactionRefCount inconsistency"); - return Err( - ErrorKind::GCError("ColTransactionRefCount inconsistency".to_string()).into() - ); - } - refcount -= 1; - self.chain_store_cache_update.tx_refcounts.insert(tx_hash, refcount); - if refcount == 0 { - self.gc_col(ColTransactionRefCount, &tx_hash.into()); - self.gc_col(ColTransactions, &tx_hash.into()); - } - Ok(()) - } - fn gc_col(&mut self, col: DBCol, key: &Vec) { assert!(SHOULD_COL_GC[col as usize]); let mut store_update = self.store().store_update(); @@ -2291,11 +2255,11 @@ impl<'a> ChainStoreUpdate<'a> { self.chain_store.block_refcounts.cache_remove(key); } DBCol::ColReceiptIdToShardId => { - store_update.delete(col, key); + store_update.update_refcount(col, key, &[], -1); self.chain_store.receipt_id_to_shard_id.cache_remove(key); } DBCol::ColTransactions => { - store_update.delete(col, key); + store_update.update_refcount(col, key, &[], -1); self.chain_store.transactions.cache_remove(key); } DBCol::ColChunks => { @@ -2347,9 +2311,6 @@ impl<'a> ChainStoreUpdate<'a> { store_update.delete(col, key); self.chain_store.last_block_with_new_chunk.cache_remove(key); } - DBCol::ColTransactionRefCount => { - store_update.delete(col, key); - } DBCol::ColProcessedBlockHeights => { store_update.delete(col, key); self.chain_store.processed_block_heights.cache_remove(key); @@ -2368,7 +2329,8 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::ColComponentEdges | DBCol::ColEpochInfo | DBCol::ColEpochStart - | DBCol::ColBlockOrdinal => { + | DBCol::ColBlockOrdinal + | DBCol::_ColTransactionRefCount => { unreachable!(); } } @@ -2471,9 +2433,8 @@ impl<'a> ChainStoreUpdate<'a> { // Increase transaction refcounts for all included txs for tx in chunk.transactions.iter() { - let mut refcount = self.chain_store.get_tx_refcount(&tx.get_hash())?; - refcount += 1; - store_update.set_ser(ColTransactionRefCount, tx.get_hash().as_ref(), &refcount)?; + let bytes = tx.try_to_vec().expect("Borsh cannot fail"); + store_update.update_refcount(ColTransactions, tx.get_hash().as_ref(), &bytes, 1) } store_update.set_ser(ColChunks, chunk_hash.as_ref(), chunk)?; @@ -2542,7 +2503,8 @@ impl<'a> ChainStoreUpdate<'a> { store_update.set_ser(ColOutcomesByBlockHash, block_hash.as_ref(), &hash_set)?; } for (receipt_id, shard_id) in self.chain_store_cache_update.receipt_id_to_shard_id.iter() { - store_update.set_ser(ColReceiptIdToShardId, receipt_id.as_ref(), shard_id)?; + let data = shard_id.try_to_vec()?; + store_update.update_refcount(ColReceiptIdToShardId, receipt_id.as_ref(), &data, 1); } for ((block_hash, shard_id), next_block_hash) in self.chain_store_cache_update.next_block_with_new_chunk.iter() @@ -2561,16 +2523,6 @@ impl<'a> ChainStoreUpdate<'a> { block_hash, )?; } - for transaction in self.chain_store_cache_update.transactions.iter() { - store_update.set_ser(ColTransactions, transaction.get_hash().as_ref(), transaction)?; - } - for (tx_hash, refcount) in self.chain_store_cache_update.tx_refcounts.drain() { - // tx_refcounts cache is used in GC only. - // While increasing, we write to the storage directly because we add no transaction twice. - if refcount > 0 { - store_update.set_ser(ColTransactionRefCount, &tx_hash.as_ref(), &refcount)?; - } - } for (block_hash, refcount) in self.chain_store_cache_update.block_refcounts.iter() { store_update.set_ser(ColBlockRefCount, block_hash.as_ref(), refcount)?; } diff --git a/chain/chain/src/store_validator.rs b/chain/chain/src/store_validator.rs index 5b01100bb8e..1b1ed32a41b 100644 --- a/chain/chain/src/store_validator.rs +++ b/chain/chain/src/store_validator.rs @@ -16,7 +16,9 @@ use near_primitives::syncing::{ShardStateSyncResponseHeader, StateHeaderKey, Sta use near_primitives::transaction::ExecutionOutcomeWithIdAndProof; use near_primitives::types::{AccountId, BlockHeight, EpochId, GCCount, ShardId}; use near_primitives::utils::get_block_shard_id_rev; -use near_store::{DBCol, Store, TrieChanges, NUM_COLS, SHOULD_COL_GC, SKIP_COL_GC}; +use near_store::{ + decode_value_with_rc, DBCol, Store, TrieChanges, NUM_COLS, SHOULD_COL_GC, SKIP_COL_GC, +}; use validate::StoreValidatorError; use crate::RuntimeAdapter; @@ -128,7 +130,7 @@ impl StoreValidator { self.errors.push(ErrorMessage { key: to_string(&key), col: to_string(&col), err }) } fn validate_col(&mut self, col: DBCol) -> Result<(), StoreValidatorError> { - for (key, value) in self.store.clone().iter(col) { + for (key, value) in self.store.clone().iter_without_rc_logic(col) { let key_ref = key.as_ref(); let value_ref = value.as_ref(); match col { @@ -277,10 +279,10 @@ impl StoreValidator { let count = GCCount::try_from_slice(value_ref)?; self.check(&validate::gc_col_count, &col, &count, col); } - DBCol::ColTransactionRefCount => { + DBCol::ColTransactions => { + let (_value, rc) = decode_value_with_rc(value_ref); let tx_hash = CryptoHash::try_from(key_ref)?; - let refcount = u64::try_from_slice(value_ref)?; - self.check(&validate::tx_refcount, &tx_hash, &refcount, col); + self.check(&validate::tx_refcount, &tx_hash, &(rc as u64), col); } DBCol::ColBlockRefCount => { let block_hash = CryptoHash::try_from(key_ref)?; @@ -340,7 +342,7 @@ impl StoreValidator { } // Check that all refs are counted if let Err(e) = validate::tx_refcount_final(self) { - self.process_error(e, "TX_REFCOUNT", DBCol::ColTransactionRefCount) + self.process_error(e, "TX_REFCOUNT", DBCol::ColTransactions) } // Check that all Block Refcounts are counted if let Err(e) = validate::block_refcount_final(self) { diff --git a/chain/chain/src/store_validator/validate.rs b/chain/chain/src/store_validator/validate.rs index ecad51e2cef..36f3d2271e2 100644 --- a/chain/chain/src/store_validator/validate.rs +++ b/chain/chain/src/store_validator/validate.rs @@ -714,15 +714,13 @@ pub(crate) fn tx_refcount( tx_hash: &CryptoHash, refcount: &u64, ) -> Result<(), StoreValidatorError> { - if let Some(expected) = sv.inner.tx_refcount.get(tx_hash) { - if refcount != expected { - err!("Invalid tx refcount, expected {:?}, found {:?}", expected, refcount) - } else { - sv.inner.tx_refcount.remove(tx_hash); - return Ok(()); - } + let expected = sv.inner.tx_refcount.get(tx_hash).map(|&rc| rc).unwrap_or_default(); + if *refcount != expected { + err!("Invalid tx refcount, expected {:?}, found {:?}", expected, refcount) + } else { + sv.inner.tx_refcount.remove(tx_hash); + return Ok(()); } - err!("Unexpected Tx found") } pub(crate) fn block_refcount( diff --git a/core/store/src/db.rs b/core/store/src/db.rs index da76e1c6ed2..900d26fac01 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -11,15 +11,17 @@ use borsh::{BorshDeserialize, BorshSerialize}; use rocksdb::Env; use rocksdb::{ BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode, - MergeOperands, Options, ReadOptions, WriteBatch, DB, + Options, ReadOptions, WriteBatch, DB, }; use strum_macros::EnumIter; -use crate::trie::merge_refcounted_records; +use crate::db::refcount::merge_refcounted_records; use near_primitives::version::DbVersion; -use rocksdb::compaction_filter::Decision; use std::marker::PhantomPinned; +pub(crate) mod refcount; +pub(crate) mod v6; + #[derive(Debug, Clone, PartialEq)] pub struct DBError(rocksdb::Error); @@ -104,7 +106,8 @@ pub enum DBCol { ColGCCount = 41, /// GC helper column to get all Outcome ids by Block Hash ColOutcomesByBlockHash = 42, - ColTransactionRefCount = 43, + /// Deprecated + _ColTransactionRefCount = 43, /// Heights of blocks that have been processed ColProcessedBlockHeights = 44, } @@ -158,13 +161,19 @@ impl std::fmt::Display for DBCol { Self::ColBlockOrdinal => "block ordinal", Self::ColGCCount => "gc count", Self::ColOutcomesByBlockHash => "outcomes by block hash", - Self::ColTransactionRefCount => "refcount per transaction", + Self::_ColTransactionRefCount => "refcount per transaction (deprecated)", Self::ColProcessedBlockHeights => "processed block heights", }; write!(formatter, "{}", desc) } } +impl DBCol { + pub fn is_rc(&self) -> bool { + IS_COL_RC[*self as usize] + } +} + // List of columns for which GC should be implemented lazy_static! { pub static ref SHOULD_COL_GC: Vec = { @@ -201,6 +210,17 @@ lazy_static! { }; } +// List of reference counted columns +lazy_static! { + pub static ref IS_COL_RC: Vec = { + let mut col_rc = vec![false; NUM_COLS]; + col_rc[DBCol::ColState as usize] = true; + col_rc[DBCol::ColTransactions as usize] = true; + col_rc[DBCol::ColReceiptIdToShardId as usize] = true; + col_rc + }; +} + pub const HEAD_KEY: &[u8; 4] = b"HEAD"; pub const TAIL_KEY: &[u8; 4] = b"TAIL"; pub const CHUNK_TAIL_KEY: &[u8; 10] = b"CHUNK_TAIL"; @@ -269,6 +289,10 @@ pub trait Database: Sync + Send { } fn get(&self, col: DBCol, key: &[u8]) -> Result>, DBError>; fn iter<'a>(&'a self, column: DBCol) -> Box, Box<[u8]>)> + 'a>; + fn iter_without_rc_logic<'a>( + &'a self, + column: DBCol, + ) -> Box, Box<[u8]>)> + 'a>; fn iter_prefix<'a>( &'a self, col: DBCol, @@ -281,7 +305,19 @@ impl Database for RocksDB { fn get(&self, col: DBCol, key: &[u8]) -> Result>, DBError> { let read_options = rocksdb_read_options(); let result = self.db.get_cf_opt(unsafe { &*self.cfs[col as usize] }, key, &read_options)?; - Ok(RocksDB::empty_value_filtering_get(col, result)) + Ok(RocksDB::get_with_rc_logic(col, result)) + } + + fn iter_without_rc_logic<'a>( + &'a self, + col: DBCol, + ) -> Box, Box<[u8]>)> + 'a> { + let read_options = rocksdb_read_options(); + unsafe { + let cf_handle = &*self.cfs[col as usize]; + let iterator = self.db.iterator_cf_opt(cf_handle, read_options, IteratorMode::Start); + Box::new(iterator) + } } fn iter<'a>(&'a self, col: DBCol) -> Box, Box<[u8]>)> + 'a> { @@ -289,7 +325,7 @@ impl Database for RocksDB { unsafe { let cf_handle = &*self.cfs[col as usize]; let iterator = self.db.iterator_cf_opt(cf_handle, read_options, IteratorMode::Start); - RocksDB::empty_value_filtering_iter(col, iterator) + RocksDB::iter_with_rc_logic(col, iterator) } } @@ -314,7 +350,7 @@ impl Database for RocksDB { IteratorMode::From(key_prefix, Direction::Forward), ) .take_while(move |(key, _value)| key.starts_with(key_prefix)); - RocksDB::empty_value_filtering_iter(col, iterator) + RocksDB::iter_with_rc_logic(col, iterator) } } @@ -323,9 +359,11 @@ impl Database for RocksDB { for op in transaction.ops { match op { DBOp::Insert { col, key, value } => unsafe { + assert!(!col.is_rc()); batch.put_cf(&*self.cfs[col as usize], key, value); }, DBOp::UpdateRefcount { col, key, value } => unsafe { + assert!(col.is_rc()); batch.merge_cf(&*self.cfs[col as usize], key, value); }, DBOp::Delete { col, key } => unsafe { @@ -339,10 +377,19 @@ impl Database for RocksDB { impl Database for TestDB { fn get(&self, col: DBCol, key: &[u8]) -> Result>, DBError> { - Ok(self.db.read().unwrap()[col as usize].get(key).cloned()) + let result = self.db.read().unwrap()[col as usize].get(key).cloned(); + Ok(RocksDB::get_with_rc_logic(col, result)) } fn iter<'a>(&'a self, col: DBCol) -> Box, Box<[u8]>)> + 'a> { + let iterator = self.iter_without_rc_logic(col); + RocksDB::iter_with_rc_logic(col, iterator) + } + + fn iter_without_rc_logic<'a>( + &'a self, + col: DBCol, + ) -> Box, Box<[u8]>)> + 'a> { let iterator = self.db.read().unwrap()[col as usize] .clone() .into_iter() @@ -355,7 +402,10 @@ impl Database for TestDB { col: DBCol, key_prefix: &'a [u8], ) -> Box, Box<[u8]>)> + 'a> { - Box::new(self.iter(col).filter(move |(key, _value)| key.starts_with(key_prefix))) + RocksDB::iter_with_rc_logic( + col, + self.iter(col).filter(move |(key, _value)| key.starts_with(key_prefix)), + ) } fn write(&self, transaction: DBTransaction) -> Result<(), DBError> { @@ -365,7 +415,7 @@ impl Database for TestDB { DBOp::Insert { col, key, value } => db[col as usize].insert(key, value), DBOp::UpdateRefcount { col, key, value } => { let mut val = db[col as usize].get(&key).cloned().unwrap_or_default(); - merge_refcounted_records(&mut val, &value).unwrap(); + merge_refcounted_records(&mut val, &value); if val.len() != 0 { db[col as usize].insert(key, val) } else { @@ -434,7 +484,7 @@ fn rocksdb_column_options(col: DBCol) -> Options { opts.optimize_level_style_compaction(1024 * 1024 * 128); opts.set_target_file_size_base(1024 * 1024 * 64); opts.set_compression_per_level(&[]); - if col == DBCol::ColState { + if col.is_rc() { opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, None); opts.set_compaction_filter("empty value filter", RocksDB::empty_value_compaction_filter); } @@ -504,63 +554,6 @@ impl TestDB { } } -impl RocksDB { - /// ColState has refcounted values. - /// Merge adds refcounts, zero refcount becomes empty value. - /// Empty values get filtered by get methods, and removed by compaction. - fn refcount_merge( - _new_key: &[u8], - existing_val: Option<&[u8]>, - operands: &mut MergeOperands, - ) -> Option> { - let mut result = vec![]; - if let Some(val) = existing_val { - // Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes) - merge_refcounted_records(&mut result, val) - .expect("Not a refcounted record in ColState"); - } - for val in operands { - // Error is only possible if decoding refcount fails (=value is between 1 and 3 bytes) - merge_refcounted_records(&mut result, val) - .expect("Not a refcounted record in ColState"); - } - Some(result) - } - - /// Compaction filter for ColState - fn empty_value_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision { - if value.is_empty() { - Decision::Remove - } else { - Decision::Keep - } - } - - /// ColState get() treats empty value as no value - fn empty_value_filtering_get(column: DBCol, value: Option>) -> Option> { - if column == DBCol::ColState && Some(vec![]) == value { - None - } else { - value - } - } - - /// ColState iterator treats empty value as no value - fn empty_value_filtering_iter<'a, I>( - column: DBCol, - iterator: I, - ) -> Box, Box<[u8]>)> + 'a> - where - I: Iterator, Box<[u8]>)> + 'a, - { - if column == DBCol::ColState { - Box::new(iterator.filter(|(_k, v)| !v.is_empty())) - } else { - Box::new(iterator) - } - } -} - #[cfg(test)] mod tests { use crate::db::DBCol::ColState; @@ -593,33 +586,41 @@ mod tests { fn rocksdb_merge_sanity() { let tmp_dir = tempfile::Builder::new().prefix("_test_snapshot_sanity").tempdir().unwrap(); let store = create_store(tmp_dir.path().to_str().unwrap()); + let ptr = (&*store.storage) as *const (dyn Database + 'static); + let rocksdb = unsafe { &*(ptr as *const RocksDB) }; assert_eq!(store.get(ColState, &[1]).unwrap(), None); { let mut store_update = store.store_update(); - store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]); + store_update.update_refcount(ColState, &[1], &[1], 1); store_update.commit().unwrap(); } { let mut store_update = store.store_update(); - store_update.update_refcount(ColState, &[1], &[1, 1, 0, 0, 0]); + store_update.update_refcount(ColState, &[1], &[1], 1); store_update.commit().unwrap(); } - assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 2, 0, 0, 0])); + assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1])); + assert_eq!( + rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), + Some(vec![1, 2, 0, 0, 0, 0, 0, 0, 0]) + ); { let mut store_update = store.store_update(); - store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]); + store_update.update_refcount(ColState, &[1], &[1], -1); store_update.commit().unwrap(); } - assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1, 1, 0, 0, 0])); + assert_eq!(store.get(ColState, &[1]).unwrap(), Some(vec![1])); + assert_eq!( + rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), + Some(vec![1, 1, 0, 0, 0, 0, 0, 0, 0]) + ); { let mut store_update = store.store_update(); - store_update.update_refcount(ColState, &[1], &[1, 255, 255, 255, 255]); + store_update.update_refcount(ColState, &[1], &[1], -1); store_update.commit().unwrap(); } // Refcount goes to 0 -> get() returns None assert_eq!(store.get(ColState, &[1]).unwrap(), None); - let ptr = (&*store.storage) as *const (dyn Database + 'static); - let rocksdb = unsafe { &*(ptr as *const RocksDB) }; // Internally there is an empty value assert_eq!(rocksdb.get_no_empty_filtering(ColState, &[1]).unwrap(), Some(vec![])); diff --git a/core/store/src/db/refcount.rs b/core/store/src/db/refcount.rs new file mode 100644 index 00000000000..7ced123fbdb --- /dev/null +++ b/core/store/src/db/refcount.rs @@ -0,0 +1,136 @@ +use crate::db::RocksDB; +use crate::DBCol; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use rocksdb::compaction_filter::Decision; +use rocksdb::MergeOperands; +use std::cmp::Ordering; +use std::io::{Cursor, Write}; + +/// Refcounted columns store value with rc. +/// +/// Write write refcount records, reads merge them. +/// The merged rc should always be positive, if it's not there is a bug in gc. +/// +/// Refcount record format: +/// rc = 0 => empty +/// rc < 0 => 8 bytes little endian rc +/// rc > 0 => value followed by 8 bytes little endian rc +/// +pub(crate) fn merge_refcounted_records(result: &mut Vec, val: &[u8]) { + let (bytes, add_rc) = decode_value_with_rc(val); + if add_rc == 0 { + return; + } + let (result_bytes, result_rc) = decode_value_with_rc(result); + if result_rc == 0 { + result.extend_from_slice(val); + } else { + let rc = result_rc + add_rc; + debug_assert!(result_rc <= 0 || add_rc <= 0 || result_bytes == bytes); + match rc.cmp(&0) { + Ordering::Less => { + result.clear(); + result.extend_from_slice(&rc.to_le_bytes()); + } + Ordering::Equal => { + result.clear(); + } + Ordering::Greater => { + if result_rc < 0 { + result.clear(); + result.extend_from_slice(val); + } + let len = result.len(); + result[len - 8..].copy_from_slice(&rc.to_le_bytes()); + } + } + } +} + +/// Returns +/// (Some(value), rc) if rc > 0 +/// (None, rc) if rc <= 0 +pub fn decode_value_with_rc(bytes: &[u8]) -> (Option<&[u8]>, i64) { + if bytes.len() < 8 { + debug_assert!(bytes.is_empty()); + return (None, 0); + } + let mut cursor = Cursor::new(&bytes[bytes.len() - 8..]); + let rc = cursor.read_i64::().unwrap(); + if rc < 0 { + (None, rc) + } else { + (Some(&bytes[..bytes.len() - 8]), rc) + } +} + +pub(crate) fn encode_value_with_rc(data: &[u8], rc: i64) -> Vec { + if rc == 0 { + return vec![]; + } + let mut cursor = Cursor::new(Vec::with_capacity(data.len() + 8)); + if rc > 0 { + cursor.write_all(data).unwrap(); + } + cursor.write_i64::(rc).unwrap(); + cursor.into_inner() +} + +impl RocksDB { + /// ColState has refcounted values. + /// Merge adds refcounts, zero refcount becomes empty value. + /// Empty values get filtered by get methods, and removed by compaction. + pub(crate) fn refcount_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, + ) -> Option> { + let mut result = vec![]; + if let Some(val) = existing_val { + merge_refcounted_records(&mut result, val); + } + for val in operands { + merge_refcounted_records(&mut result, val); + } + Some(result) + } + + /// Compaction filter for ColState + pub(crate) fn empty_value_compaction_filter( + _level: u32, + _key: &[u8], + value: &[u8], + ) -> Decision { + if value.is_empty() { + Decision::Remove + } else { + Decision::Keep + } + } + + /// Treats empty value as no value and strips refcount + pub(crate) fn get_with_rc_logic(column: DBCol, value: Option>) -> Option> { + if column.is_rc() { + value.and_then(|vec| decode_value_with_rc(&vec).0.map(|v| v.to_vec())) + } else { + value + } + } + + /// Iterator treats empty value as no value and strips refcount + pub(crate) fn iter_with_rc_logic<'a, I>( + column: DBCol, + iterator: I, + ) -> Box, Box<[u8]>)> + 'a> + where + I: Iterator, Box<[u8]>)> + 'a, + { + if column.is_rc() { + Box::new(iterator.filter_map(|(k, v_rc)| { + decode_value_with_rc(&v_rc).0.map(|v| (k, v.to_vec().into_boxed_slice())) + })) + } else { + Box::new(iterator) + } + } +} diff --git a/core/store/src/db/v6.rs b/core/store/src/db/v6.rs new file mode 100644 index 00000000000..8df069afb3e --- /dev/null +++ b/core/store/src/db/v6.rs @@ -0,0 +1,85 @@ +use crate::db::{rocksdb_column_options, rocksdb_options, DBError, RocksDB}; +use crate::DBCol; +use byteorder::{LittleEndian, ReadBytesExt}; +use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, MergeOperands, Options, DB}; +use std::io::Cursor; +use std::marker::PhantomPinned; + +fn refcount_merge_v6( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &mut MergeOperands, +) -> Option> { + let mut result = vec![]; + if let Some(val) = existing_val { + merge_refcounted_records_v6(&mut result, val); + } + for val in operands { + merge_refcounted_records_v6(&mut result, val); + } + Some(result) +} + +fn vec_to_rc(bytes: &[u8]) -> i32 { + let mut cursor = Cursor::new(&bytes[bytes.len() - 4..]); + cursor.read_i32::().unwrap() +} + +fn merge_refcounted_records_v6(result: &mut Vec, val: &[u8]) -> () { + if val.is_empty() { + return; + } + let add_rc = vec_to_rc(val); + if !result.is_empty() { + let result_rc = vec_to_rc(result) + add_rc; + + debug_assert_eq!(result[0..(result.len() - 4)], val[0..(val.len() - 4)]); + let len = result.len(); + result[(len - 4)..].copy_from_slice(&result_rc.to_le_bytes()); + if result_rc == 0 { + *result = vec![]; + } + } else { + *result = val.to_vec(); + } +} + +fn rocksdb_column_options_v6(col: DBCol) -> Options { + let mut opts = rocksdb_column_options(DBCol::ColDbVersion); + + if col == DBCol::ColState { + opts.set_merge_operator("refcount merge", refcount_merge_v6, None); + opts.set_compaction_filter("empty value filter", RocksDB::empty_value_compaction_filter); + } + opts +} + +impl RocksDB { + pub(crate) fn new_v6>(path: P) -> Result { + use strum::IntoEnumIterator; + + let options = rocksdb_options(); + // Open only the columns used by migration + let cols_to_open = [ + DBCol::ColDbVersion, + DBCol::ColState, + DBCol::ColTransactions, + DBCol::_ColTransactionRefCount, + ]; + + let cf_descriptors = cols_to_open.iter().map(|&col| { + ColumnFamilyDescriptor::new( + format!("col{}", col as usize), + rocksdb_column_options_v6(col), + ) + }); + let db = DB::open_cf_descriptors(&options, path, cf_descriptors)?; + + let mut cfs = vec![std::ptr::null::(); DBCol::iter().count()]; + cols_to_open.iter().for_each(|&col| { + let name = format!("col{}", col as usize); + cfs[col as usize] = db.cf_handle(&name).unwrap() as *const ColumnFamily; + }); + Ok(Self { db, cfs, _pin: PhantomPinned }) + } +} diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 1fc70f7513f..f2d7abd2514 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -26,6 +26,8 @@ use near_primitives::serialize::to_base; use near_primitives::trie_key::{trie_key_parsers, TrieKey}; use near_primitives::types::AccountId; +pub use crate::db::refcount::decode_value_with_rc; +use crate::db::refcount::encode_value_with_rc; use crate::db::{DBOp, DBTransaction, Database, RocksDB}; pub use crate::trie::{ iterator::TrieIterator, update::TrieUpdate, update::TrieUpdateIterator, @@ -83,6 +85,13 @@ impl Store { self.storage.iter(column) } + pub fn iter_without_rc_logic<'a>( + &'a self, + column: DBCol, + ) -> Box, Box<[u8]>)> + 'a> { + self.storage.iter_without_rc_logic(column) + } + pub fn iter_prefix<'a>( &'a self, column: DBCol, @@ -105,7 +114,7 @@ impl Store { pub fn save_to_file(&self, column: DBCol, filename: &Path) -> Result<(), std::io::Error> { let mut file = File::create(filename)?; - for (key, value) in self.storage.iter(column) { + for (key, value) in self.storage.iter_without_rc_logic(column) { file.write_u32::(key.len() as u32)?; file.write_all(&key)?; file.write_u32::(value.len() as u32)?; @@ -154,7 +163,9 @@ impl StoreUpdate { StoreUpdate { storage, transaction, tries: Some(tries) } } - pub fn update_refcount(&mut self, column: DBCol, key: &[u8], value: &[u8]) { + pub fn update_refcount(&mut self, column: DBCol, key: &[u8], value: &[u8], rc_delta: i64) { + debug_assert!(column.is_rc()); + let value = encode_value_with_rc(value, rc_delta); self.transaction.update_refcount(column, key, value) } @@ -168,6 +179,7 @@ impl StoreUpdate { key: &[u8], value: &T, ) -> Result<(), io::Error> { + debug_assert!(!column.is_rc()); let data = value.try_to_vec()?; self.set(column, key, &data); Ok(()) @@ -206,24 +218,26 @@ impl StoreUpdate { } } + #[cfg(debug_assertions)] + fn transaction_overwrites_itself(ops: &Vec) -> bool { + let non_refcount_keys = ops + .iter() + .filter_map(|op| match op { + DBOp::Insert { col, key, .. } => Some((*col as u8, key)), + DBOp::Delete { col, key } => Some((*col as u8, key)), + DBOp::UpdateRefcount { .. } => None, + }) + .collect::>(); + return non_refcount_keys.len() + != non_refcount_keys.iter().collect::>().len(); + } + pub fn commit(self) -> Result<(), io::Error> { - /* TODO: enable after #3169 is fixed - debug_assert!( - self.transaction.ops.len() - == self - .transaction - .ops - .iter() - .map(|op| match op { - DBOp::Insert { col, key, .. } => (*col as u8, key), - DBOp::Delete { col, key } => (*col as u8, key), - DBOp::UpdateRefcount { col, key, .. } => (*col as u8, key), - }) - .collect::>() - .len(), + debug_assert!( + !Self::transaction_overwrites_itself(&self.transaction.ops), "Transaction overwrites itself: {:?}", self - );*/ + ); if let Some(tries) = self.tries { assert_eq!( tries.get_store().storage.deref() as *const _, diff --git a/core/store/src/migrations.rs b/core/store/src/migrations.rs index 138ea075be4..c262a7d3fdb 100644 --- a/core/store/src/migrations.rs +++ b/core/store/src/migrations.rs @@ -8,20 +8,28 @@ use near_primitives::transaction::ExecutionOutcomeWithIdAndProof; use near_primitives::version::DbVersion; use crate::db::{DBCol, RocksDB, VERSION_KEY}; -use crate::Store; +use crate::migrations::v6_to_v7::{col_state_refcount_8byte, migrate_col_transaction_refcount}; +use crate::{Store, StoreUpdate}; use near_primitives::sharding::ShardChunk; +use std::sync::Arc; + +pub mod v6_to_v7; pub fn get_store_version(path: &str) -> DbVersion { RocksDB::get_version(path).expect("Failed to open the database") } -pub fn set_store_version(store: &Store, db_version: u32) { - let mut store_update = store.store_update(); +fn set_store_version_inner(store_update: &mut StoreUpdate, db_version: u32) { store_update.set( DBCol::ColDbVersion, VERSION_KEY, &serde_json::to_vec(&db_version).expect("Failed to serialize version"), ); +} + +pub fn set_store_version(store: &Store, db_version: u32) { + let mut store_update = store.store_update(); + set_store_version_inner(&mut store_update, db_version); store_update.commit().expect("Failed to write version to database"); } @@ -78,8 +86,18 @@ pub fn fill_col_transaction_refcount(store: &Store) { } for (tx_hash, refcount) in tx_refcount { store_update - .set_ser(DBCol::ColTransactionRefCount, tx_hash.as_ref(), &refcount) + .set_ser(DBCol::_ColTransactionRefCount, tx_hash.as_ref(), &refcount) .expect("BorshSerialize should not fail"); } store_update.commit().expect("Failed to migrate"); } + +pub fn migrate_6_to_7(path: &String) { + let db = Arc::pin(RocksDB::new_v6(path).expect("Failed to open the database")); + let store = Store::new(db); + let mut store_update = store.store_update(); + col_state_refcount_8byte(&store, &mut store_update); + migrate_col_transaction_refcount(&store, &mut store_update); + set_store_version_inner(&mut store_update, 7); + store_update.commit().expect("Failed to migrate") +} diff --git a/core/store/src/migrations/v6_to_v7.rs b/core/store/src/migrations/v6_to_v7.rs new file mode 100644 index 00000000000..2ec1164377f --- /dev/null +++ b/core/store/src/migrations/v6_to_v7.rs @@ -0,0 +1,54 @@ +use crate::db::refcount::encode_value_with_rc; +use crate::{DBCol, Store, StoreUpdate}; +use borsh::ser::BorshSerialize; +use near_primitives::borsh::BorshDeserialize; +use near_primitives::hash::CryptoHash; +use near_primitives::transaction::SignedTransaction; +use std::collections::HashMap; + +// Refcount from i32 to i64 +pub(crate) fn col_state_refcount_8byte(store: &Store, store_update: &mut StoreUpdate) { + for (k, v) in store.iter_without_rc_logic(DBCol::ColState) { + if v.len() < 4 { + store_update.delete(DBCol::ColState, &k); + continue; + } + let mut v = v.into_vec(); + v.extend_from_slice(&[0, 0, 0, 0]); + store_update.set(DBCol::ColState, &k, &v); + } +} + +// Deprecate ColTransactionRefCount, move the info to ColTransactions +pub(crate) fn migrate_col_transaction_refcount(store: &Store, store_update: &mut StoreUpdate) { + let transactions: Vec = store + .iter_without_rc_logic(DBCol::ColTransactions) + .map(|(_key, value)| { + SignedTransaction::try_from_slice(&value).expect("BorshDeserialize should not fail") + }) + .collect(); + let tx_refcount: HashMap = store + .iter(DBCol::_ColTransactionRefCount) + .map(|(key, value)| { + ( + CryptoHash::try_from_slice(&key).expect("BorshDeserialize should not fail"), + u64::try_from_slice(&value).expect("BorshDeserialize should not fail"), + ) + }) + .collect(); + + assert_eq!(transactions.len(), tx_refcount.len()); + + for tx in transactions { + let tx_hash = tx.get_hash(); + let bytes = tx.try_to_vec().expect("BorshSerialize should not fail"); + let rc = *tx_refcount.get(&tx_hash).expect("Inconsistent tx refcount data") as i64; + assert!(rc > 0); + store_update.set( + DBCol::ColTransactions, + tx_hash.as_ref(), + &encode_value_with_rc(&bytes, rc), + ); + store_update.delete(DBCol::_ColTransactionRefCount, tx_hash.as_ref()); + } +} diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 6865bf4c471..fc00662ea82 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -17,7 +17,6 @@ use crate::trie::insert_delete::NodesStorage; use crate::trie::iterator::TrieIterator; use crate::trie::nibble_slice::NibbleSlice; pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges}; -pub(crate) use crate::trie::trie_storage::merge_refcounted_records; use crate::trie::trie_storage::{ TouchedNodesCounter, TrieCachingStorage, TrieMemoryPartialStorage, TrieRecordingStorage, TrieStorage, @@ -396,24 +395,6 @@ impl RawTrieNodeWithSize { } } -fn encode_trie_node_with_rc(data: &[u8], rc: i32) -> Vec { - let mut cursor = Cursor::new(Vec::with_capacity(data.len() + 4)); - cursor.write_all(data).unwrap(); - cursor.write_i32::(rc).unwrap(); - cursor.into_inner() -} - -fn decode_trie_node_with_rc(bytes: &[u8]) -> Result<(&[u8], i32), StorageError> { - if bytes.len() < 4 { - return Err(StorageError::StorageInconsistentState( - "Decode node with RC failed".to_string(), - )); - } - let mut cursor = Cursor::new(&bytes[bytes.len() - 4..]); - let rc = cursor.read_i32::().unwrap(); - Ok((&bytes[..bytes.len() - 4], rc)) -} - pub struct Trie { pub(crate) storage: Box, pub counter: TouchedNodesCounter, diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index ceaee58b2d1..6e8d1832c4e 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -1,5 +1,4 @@ use crate::db::{DBCol, DBOp, DBTransaction}; -use crate::trie::encode_trie_node_with_rc; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; use crate::{StorageError, Store, StoreUpdate, Trie, TrieChanges, TrieUpdate}; use borsh::BorshSerialize; @@ -74,8 +73,7 @@ impl ShardTries { store_update.tries = Some(tries.clone()); for (hash, value, rc) in deletions.iter() { let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash); - let bytes = encode_trie_node_with_rc(&value, -(*rc as i32)); - store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes); + store_update.update_refcount(DBCol::ColState, key.as_ref(), &value, -(*rc as i64)); } Ok(()) } @@ -89,8 +87,7 @@ impl ShardTries { store_update.tries = Some(tries); for (hash, value, rc) in insertions.iter() { let key = TrieCachingStorage::get_key_from_shard_id_and_hash(shard_id, hash); - let bytes = encode_trie_node_with_rc(&value, *rc as i32); - store_update.update_refcount(DBCol::ColState, key.as_ref(), &bytes); + store_update.update_refcount(DBCol::ColState, key.as_ref(), &value, *rc as i64); } Ok(()) } diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index 636d3c84cff..2abbbbd7e60 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -6,7 +6,8 @@ use cached::{Cached, SizedCache}; use near_primitives::hash::CryptoHash; -use crate::trie::{decode_trie_node_with_rc, POISONED_LOCK_ERR}; +use crate::db::refcount::decode_value_with_rc; +use crate::trie::POISONED_LOCK_ERR; use crate::{ColState, StorageError, Store}; use near_primitives::types::ShardId; use std::convert::{TryFrom, TryInto}; @@ -24,9 +25,7 @@ impl TrieCache { let mut guard = self.0.lock().expect(POISONED_LOCK_ERR); for (hash, opt_value_rc) in ops { if let Some(value_rc) = opt_value_rc { - let (value, rc) = - decode_trie_node_with_rc(&value_rc).expect("Don't write invalid values"); - if rc > 0 { + if let (Some(value), _rc) = decode_value_with_rc(&value_rc) { if value.len() < TRIE_LIMIT_CACHED_VALUE_SIZE { guard.cache_set(hash, value.to_vec()); } @@ -119,39 +118,11 @@ pub struct TrieCachingStorage { pub(crate) shard_id: ShardId, } -pub fn merge_refcounted_records(result: &mut Vec, val: &[u8]) -> Result<(), StorageError> { - if val.is_empty() { - return Ok(()); - } - let add_rc = TrieCachingStorage::vec_to_rc(val)?; - if !result.is_empty() { - let result_rc = TrieCachingStorage::vec_to_rc(result)? + add_rc; - - debug_assert_eq!(result[0..(result.len() - 4)], val[0..(val.len() - 4)]); - let len = result.len(); - result[(len - 4)..].copy_from_slice(&result_rc.to_le_bytes()); - if result_rc == 0 { - *result = vec![]; - } - } else { - *result = val.to_vec(); - } - Ok(()) -} - impl TrieCachingStorage { pub fn new(store: Arc, cache: TrieCache, shard_id: ShardId) -> TrieCachingStorage { TrieCachingStorage { store, cache, shard_id } } - fn vec_to_rc(val: &[u8]) -> Result { - decode_trie_node_with_rc(&val).map(|(_bytes, rc)| rc) - } - - fn vec_to_bytes(val: &[u8]) -> Result, StorageError> { - decode_trie_node_with_rc(&val).map(|(bytes, _rc)| bytes.to_vec()) - } - pub(crate) fn get_shard_id_and_hash_from_key( key: &[u8], ) -> Result<(u64, CryptoHash), std::io::Error> { @@ -183,14 +154,10 @@ impl TrieStorage for TrieCachingStorage { .get(ColState, key.as_ref()) .map_err(|_| StorageError::StorageInternalError)?; if let Some(val) = val { - let raw_node = Self::vec_to_bytes(&val); - debug_assert!(Self::vec_to_rc(&val).unwrap() > 0); - if val.len() < TRIE_LIMIT_CACHED_VALUE_SIZE && raw_node.is_ok() { - if let Ok(ref bytes) = raw_node { - guard.cache_set(*hash, bytes.clone()); - } + if val.len() < TRIE_LIMIT_CACHED_VALUE_SIZE { + guard.cache_set(*hash, val.clone()); } - raw_node + Ok(val) } else { // not StorageError::TrieNodeMissing because it's only for TrieMemoryPartialStorage Err(StorageError::StorageInconsistentState("Trie node missing".to_string())) diff --git a/core/store/src/validate.rs b/core/store/src/validate.rs deleted file mode 100644 index 8b137891791..00000000000 --- a/core/store/src/validate.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/neard/src/lib.rs b/neard/src/lib.rs index bcd3d54e858..fd5b9ce11a4 100644 --- a/neard/src/lib.rs +++ b/neard/src/lib.rs @@ -11,7 +11,8 @@ use near_client::{start_client, start_view_client, ClientActor, ViewClientActor} use near_jsonrpc::start_http; use near_network::{NetworkRecipient, PeerManagerActor}; use near_store::migrations::{ - fill_col_outcomes_by_hash, fill_col_transaction_refcount, get_store_version, set_store_version, + fill_col_outcomes_by_hash, fill_col_transaction_refcount, get_store_version, migrate_6_to_7, + set_store_version, }; use near_store::{create_store, Store}; use near_telemetry::TelemetryActor; @@ -102,6 +103,13 @@ pub fn apply_store_migrations(path: &String) { let store = create_store(&path); set_store_version(&store, 6); } + if db_version <= 6 { + // version 6 => 7: + // - make ColState use 8 bytes for refcount (change to merge operator) + // - move ColTransactionRefCount into ColTransactions + // - make ColReceiptIdToShardId refcounted + migrate_6_to_7(path); + } let db_version = get_store_version(path); debug_assert_eq!(db_version, near_primitives::version::DB_VERSION);