From ab7eb86a8544615638b9242f7697e88b3cf9c973 Mon Sep 17 00:00:00 2001 From: nk_ysg Date: Thu, 14 Apr 2022 17:21:13 +0800 Subject: [PATCH] Merge statedb flush (#3323) * merge statedb flush * test_starcoin_merkle add ignore * add dea_code for test * fix StateCache reset error * fix bug * revert starcoin_merkle_test ignore * fix fmt * update get * remove unused code * remove fn change_sets * fix fmt * add statetree test 1. add compare continue commit and batch flush test 2. add remove test * fix clippy * add test --- Cargo.lock | 1 + contrib-contracts/src/starcoin_merkle_test.rs | 2 +- executor/src/block_executor.rs | 5 - state/state-tree/Cargo.toml | 1 + state/state-tree/src/state_tree.rs | 60 +++-- state/state-tree/src/state_tree_test.rs | 215 ++++++++++++------ 6 files changed, 199 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b91b03d74..590ac1f321 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10277,6 +10277,7 @@ dependencies = [ "serde 1.0.136", "starcoin-config", "starcoin-crypto", + "starcoin-logger", "starcoin-state-store-api", "starcoin-storage", "starcoin-types", diff --git a/contrib-contracts/src/starcoin_merkle_test.rs b/contrib-contracts/src/starcoin_merkle_test.rs index 0dc3ae482d..dbc6f8f581 100644 --- a/contrib-contracts/src/starcoin_merkle_test.rs +++ b/contrib-contracts/src/starcoin_merkle_test.rs @@ -61,7 +61,7 @@ fn test_starcoin_merkle() -> Result<()> { } { - // change to previout state root. + // change to previous state root. let old_chain_state = chain_state.fork_at(state_root); // let state_root = chain_state.state_root(); let _expected_root = MoveValue::vector_u8(state_root.to_vec()); diff --git a/executor/src/block_executor.rs b/executor/src/block_executor.rs index 10431cac0b..e3c11b69c6 100644 --- a/executor/src/block_executor.rs +++ b/executor/src/block_executor.rs @@ -63,11 +63,6 @@ pub fn block_execute( let txn_state_root = chain_state .commit() .map_err(BlockExecutorError::BlockChainStateErr)?; - //every transaction's state tree root and tree nodes should save to storage - //TODO merge database flush. - chain_state - .flush() - .map_err(BlockExecutorError::BlockChainStateErr)?; executed_data.txn_infos.push(TransactionInfo::new( txn_hash, diff --git a/state/state-tree/Cargo.toml b/state/state-tree/Cargo.toml index f62dd615db..089a6e4921 100644 --- a/state/state-tree/Cargo.toml +++ b/state/state-tree/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "1.0.130" } forkable-jellyfish-merkle = { path = "../../commons/forkable-jellyfish-merkle"} starcoin-state-store-api = {path = "../state-store-api"} bcs-ext = { package="bcs-ext", path = "../../commons/bcs_ext" } +logger = { path = "../../commons/logger", package="starcoin-logger"} [dev-dependencies] starcoin-config= { path = "../../config"} diff --git a/state/state-tree/src/state_tree.rs b/state/state-tree/src/state_tree.rs index a762e7a921..b816d3d66f 100644 --- a/state/state-tree/src/state_tree.rs +++ b/state/state-tree/src/state_tree.rs @@ -6,6 +6,7 @@ use forkable_jellyfish_merkle::proof::SparseMerkleProof; use forkable_jellyfish_merkle::{ JellyfishMerkleTree, RawKey, StaleNodeIndex, TreeReader, TreeUpdateBatch, }; +use logger::prelude::debug; use parking_lot::{Mutex, RwLock}; use starcoin_crypto::hash::*; use starcoin_state_store_api::*; @@ -23,7 +24,8 @@ use std::sync::Arc; #[derive(Clone)] pub struct StateCache { root_hash: HashValue, - change_set: TreeUpdateBatch, + change_set_list: Vec<(HashValue, TreeUpdateBatch)>, + split_off_idx: Option, } impl StateCache @@ -33,17 +35,22 @@ where pub fn new(initial_root: HashValue) -> Self { Self { root_hash: initial_root, - change_set: TreeUpdateBatch::default(), + change_set_list: Vec::new(), + split_off_idx: None, } } fn reset(&mut self, root_hash: HashValue) { self.root_hash = root_hash; - self.change_set = TreeUpdateBatch::default(); + self.change_set_list = if let Some(split_idx) = self.split_off_idx { + self.change_set_list.split_off(split_idx) + } else { + Vec::new() + }; } fn add_changeset(&mut self, root_hash: HashValue, cs: TreeUpdateBatch) { - let cur_change_set = &mut self.change_set; + let mut cur_change_set = TreeUpdateBatch::default(); let mut cs_num_stale_leaves = cs.num_stale_leaves; for stale_node in cs.stale_node_index_batch.iter() { match cur_change_set.node_batch.remove(&stale_node.node_key) { @@ -70,7 +77,7 @@ where cur_change_set.num_new_leaves += 1; } } - + self.change_set_list.push((root_hash, cur_change_set)); self.root_hash = root_hash; } } @@ -196,11 +203,25 @@ where /// commit the state change into underline storage. pub fn flush(&self) -> Result<()> { - let (root_hash, change_sets) = self.change_sets(); + let change_set_list = { + let mut cache_guard = self.cache.lock(); + cache_guard.split_off_idx = Some(cache_guard.change_set_list.len()); + cache_guard.change_set_list.clone() + }; + debug!("change_set_list len {}", change_set_list.len()); + // when self::commit call self::updates(&self, updates: Vec<(K, Option)>) + // the param updates is empty cause this situation + if change_set_list.is_empty() { + return Ok(()); + } + let mut root_hash = HashValue::default(); let mut node_map = BTreeMap::new(); - for (nk, n) in change_sets.node_batch.into_iter() { - node_map.insert(nk, n.try_into()?); + for (hash, change_sets) in change_set_list.into_iter() { + for (nk, n) in change_sets.node_batch.into_iter() { + node_map.insert(nk, n.try_into()?); + } + root_hash = hash; } self.storage.write_nodes(node_map)?; // and then advance the storage root hash @@ -259,9 +280,6 @@ where }; let tree = JellyfishMerkleTree::new(&reader); let (new_state_root, change_set) = tree.updates(Some(cur_root_hash), updates)?; - // cache.root_hashes.push(new_state_root); - // cache.change_sets.push(change_set); - // cache.root_hash = new_state_root; cache.add_changeset(new_state_root, change_set); Ok(new_state_root) } @@ -296,10 +314,18 @@ where // } /// get all changes so far based on initial root_hash. + /* pub fn change_sets(&self) -> (HashValue, TreeUpdateBatch) { let cache_guard = self.cache.lock(); (cache_guard.root_hash, cache_guard.change_set.clone()) + } */ + + /// get last changes root_hash + pub fn last_change_sets(&self) -> Option<(HashValue, TreeUpdateBatch)> { + let cache_gurad = self.cache.lock(); + cache_gurad.change_set_list.last().cloned() } + // TODO: to keep atomic with other commit. // TODO: think about the WriteBatch trait position. // pub fn save(&self, batch: &mut T) -> Result<()> @@ -327,8 +353,10 @@ where if node_key == &*SPARSE_MERKLE_PLACEHOLDER_HASH { return Ok(Some(Node::new_null())); } - if let Some(n) = self.cache.change_set.node_batch.get(node_key).cloned() { - return Ok(Some(n)); + for change_set in self.cache.change_set_list.iter().rev() { + if let Some(n) = change_set.1.node_batch.get(node_key).cloned() { + return Ok(Some(n)); + } } match self.store.get(node_key) { Ok(Some(n)) => Ok(Some(n.try_into()?)), @@ -350,8 +378,10 @@ where if node_key == &*SPARSE_MERKLE_PLACEHOLDER_HASH { return Ok(Some(Node::new_null())); } - if let Some(n) = self.cache.change_set.node_batch.get(node_key).cloned() { - return Ok(Some(n)); + for change_set in self.cache.change_set_list.iter().rev() { + if let Some(n) = change_set.1.node_batch.get(node_key).cloned() { + return Ok(Some(n)); + } } match self.store.get(node_key) { Ok(Some(n)) => Ok(Some(n.try_into()?)), diff --git a/state/state-tree/src/state_tree_test.rs b/state/state-tree/src/state_tree_test.rs index 20c528909a..a890660614 100644 --- a/state/state-tree/src/state_tree_test.rs +++ b/state/state-tree/src/state_tree_test.rs @@ -24,77 +24,156 @@ pub fn update_nibble(original_key: &HashValueKey, n: usize, nibble: u8) -> HashV } #[test] -pub fn test_put_blob() -> Result<()> { - let s = MockStateNodeStore::new(); - let state = StateTree::::new(Arc::new(s), None); - assert_eq!(state.root_hash(), *SPARSE_MERKLE_PLACEHOLDER_HASH); +pub fn test_put_blob_continue_commit_flush_same() -> Result<()> { + let s1 = MockStateNodeStore::new(); + let state1 = StateTree::::new(Arc::new(s1), None); + assert_eq!(state1.root_hash(), *SPARSE_MERKLE_PLACEHOLDER_HASH); + + let s2 = MockStateNodeStore::new(); + let state2 = StateTree::::new(Arc::new(s2), None); + assert_eq!(state2.root_hash(), *SPARSE_MERKLE_PLACEHOLDER_HASH); let hash_value = HashValue::random().into(); + let account11 = update_nibble(&hash_value, 0, 1); + let account11 = update_nibble(&account11, 2, 2); + let account21 = account11; - let account1 = update_nibble(&hash_value, 0, 1); - let account1 = update_nibble(&account1, 2, 2); - state.put(account1, vec![0, 0, 0]); + state1.put(account11, vec![0, 0, 0]); + assert_eq!(state1.get(&account11)?, Some(vec![0, 0, 0])); + assert_eq!(state1.get(&update_nibble(&hash_value, 0, 8))?, None); - assert_eq!(state.get(&account1)?, Some(vec![0, 0, 0])); - assert_eq!(state.get(&update_nibble(&hash_value, 0, 8))?, None); + state2.put(account21, vec![0, 0, 0]); + assert_eq!(state2.get(&account21)?, Some(vec![0, 0, 0])); + assert_eq!(state2.get(&update_nibble(&hash_value, 0, 8))?, None); - let new_root_hash = state.commit()?; - assert_eq!(state.root_hash(), new_root_hash); - assert_eq!(state.get(&account1)?, Some(vec![0, 0, 0])); - assert_eq!(state.get(&update_nibble(&hash_value, 0, 8))?, None); - - let (root, updates) = state.change_sets(); - assert_eq!(root, new_root_hash); - assert_eq!(updates.num_stale_leaves, 0); - assert_eq!(updates.num_new_leaves, 1); - assert_eq!(updates.node_batch.len(), 1); - assert_eq!(updates.stale_node_index_batch.len(), 1); - - let account2 = update_nibble(&account1, 0, 2); - state.put(account2, vec![0, 0, 0]); - assert_eq!(state.get(&account2)?, Some(vec![0, 0, 0])); - let new_root_hash = state.commit()?; - assert_eq!(state.root_hash(), new_root_hash); - assert_eq!(state.get(&account2)?, Some(vec![0, 0, 0])); - let (root, updates) = state.change_sets(); - assert_eq!(root, new_root_hash); - assert_eq!(updates.num_stale_leaves, 0); - assert_eq!(updates.num_new_leaves, 2); - assert_eq!(updates.node_batch.len(), 3); - assert_eq!(updates.stale_node_index_batch.len(), 1); + let new_root_hash1 = state1.commit()?; + assert_eq!(state1.root_hash(), new_root_hash1); + assert_eq!(state1.get(&account11)?, Some(vec![0, 0, 0])); + assert_eq!(state1.get(&update_nibble(&hash_value, 0, 8))?, None); + + let new_root_hash2 = state2.commit()?; + assert_eq!(state2.root_hash(), new_root_hash2); + assert_eq!(state2.get(&account11)?, Some(vec![0, 0, 0])); + assert_eq!(state2.get(&update_nibble(&hash_value, 0, 8))?, None); + + let (root1, updates1) = state1.last_change_sets().unwrap(); + assert_eq!(root1, new_root_hash1); + assert_eq!(updates1.num_stale_leaves, 0); + assert_eq!(updates1.num_new_leaves, 1); + assert_eq!(updates1.node_batch.len(), 1); + assert_eq!(updates1.stale_node_index_batch.len(), 1); + state1.flush()?; + + let (root2, updates2) = state2.last_change_sets().unwrap(); + assert_eq!(root2, new_root_hash2); + assert_eq!(updates2.num_stale_leaves, 0); + assert_eq!(updates2.num_new_leaves, 1); + assert_eq!(updates2.node_batch.len(), 1); + assert_eq!(updates2.stale_node_index_batch.len(), 1); + assert_eq!(root1, root2); + + let account12 = update_nibble(&account11, 0, 2); + state1.put(account12, vec![0, 0, 0]); + assert_eq!(state1.get(&account12)?, Some(vec![0, 0, 0])); + let new_root_hash1 = state1.commit()?; + assert_eq!(state1.root_hash(), new_root_hash1); + assert_eq!(state1.get(&account12)?, Some(vec![0, 0, 0])); + let (root1, updates1) = state1.last_change_sets().unwrap(); + assert_eq!(root1, new_root_hash1); + assert_eq!(updates1.num_stale_leaves, 0); + assert_eq!(updates1.num_new_leaves, 1); + assert_eq!(updates1.node_batch.len(), 2); + assert_eq!(updates1.stale_node_index_batch.len(), 0); + state1.flush()?; + + let account22 = update_nibble(&account21, 0, 2); + state2.put(account22, vec![0, 0, 0]); + assert_eq!(state2.get(&account12)?, Some(vec![0, 0, 0])); + let new_root_hash2 = state2.commit()?; + assert_eq!(state2.root_hash(), new_root_hash2); + assert_eq!(state2.get(&account22)?, Some(vec![0, 0, 0])); + let (root2, updates2) = state2.last_change_sets().unwrap(); + assert_eq!(root2, new_root_hash2); + assert_eq!(updates2.num_stale_leaves, 0); + assert_eq!(updates2.num_new_leaves, 1); + assert_eq!(updates2.node_batch.len(), 2); + assert_eq!(updates2.stale_node_index_batch.len(), 0); + assert_eq!(root1, root2); // modify existed account - state.put(account1, vec![1, 1, 1]); - assert_eq!(state.get(&account1)?, Some(vec![1, 1, 1])); - let new_root_hash = state.commit()?; - assert_eq!(state.root_hash(), new_root_hash); - assert_eq!(state.get(&account1)?, Some(vec![1, 1, 1])); - let (root, updates) = state.change_sets(); - assert_eq!(root, new_root_hash); - assert_eq!(updates.num_stale_leaves, 0); - assert_eq!(updates.num_new_leaves, 2); - assert_eq!(updates.node_batch.len(), 3); - assert_eq!(updates.stale_node_index_batch.len(), 1); + state1.put(account11, vec![1, 1, 1]); + assert_eq!(state1.get(&account11)?, Some(vec![1, 1, 1])); + let new_root_hash1 = state1.commit()?; + assert_eq!(state1.root_hash(), new_root_hash1); + assert_eq!(state1.get(&account11)?, Some(vec![1, 1, 1])); + let (root1, updates1) = state1.last_change_sets().unwrap(); + assert_eq!(root1, new_root_hash1); + assert_eq!(updates1.num_stale_leaves, 1); + assert_eq!(updates1.num_new_leaves, 1); + assert_eq!(updates1.node_batch.len(), 2); + assert_eq!(updates1.stale_node_index_batch.len(), 2); + state1.flush()?; - let account3 = update_nibble(&account1, 2, 3); - for (k, v) in vec![(account1, vec![1, 1, 0]), (account3, vec![0, 0, 0])] { - state.put(k, v); + // modify existed account + state2.put(account21, vec![1, 1, 1]); + assert_eq!(state2.get(&account21)?, Some(vec![1, 1, 1])); + let new_root_hash2 = state2.commit()?; + assert_eq!(state2.root_hash(), new_root_hash2); + assert_eq!(state2.get(&account21)?, Some(vec![1, 1, 1])); + let (root2, updates2) = state2.last_change_sets().unwrap(); + assert_eq!(root2, new_root_hash2); + assert_eq!(updates2.num_stale_leaves, 1); + assert_eq!(updates2.num_new_leaves, 1); + assert_eq!(updates2.node_batch.len(), 2); + assert_eq!(updates2.stale_node_index_batch.len(), 2); + + let account13 = update_nibble(&account11, 2, 3); + for (k, v) in vec![(account11, vec![1, 1, 0]), (account13, vec![0, 0, 0])] { + state1.put(k, v); } - assert_eq!(state.get(&account1)?, Some(vec![1, 1, 0])); - assert_eq!(state.get(&account2)?, Some(vec![0, 0, 0])); - assert_eq!(state.get(&account3)?, Some(vec![0, 0, 0])); + assert_eq!(state1.get(&account11)?, Some(vec![1, 1, 0])); + assert_eq!(state1.get(&account12)?, Some(vec![0, 0, 0])); + assert_eq!(state1.get(&account13)?, Some(vec![0, 0, 0])); + let new_root_hash1 = state1.commit()?; + assert_eq!(state1.root_hash(), new_root_hash1); + assert_eq!(state1.get(&account11)?, Some(vec![1, 1, 0])); + assert_eq!(state1.get(&account12)?, Some(vec![0, 0, 0])); + assert_eq!(state1.get(&account13)?, Some(vec![0, 0, 0])); + let (root1, updates1) = state1.last_change_sets().unwrap(); + assert_eq!(updates1.num_stale_leaves, 1); + assert_eq!(updates1.num_new_leaves, 2); + assert_eq!(updates1.node_batch.len(), 5); + assert_eq!(updates1.stale_node_index_batch.len(), 2); - let new_root_hash = state.commit()?; - assert_eq!(state.root_hash(), new_root_hash); - assert_eq!(state.get(&account1)?, Some(vec![1, 1, 0])); - assert_eq!(state.get(&account2)?, Some(vec![0, 0, 0])); - assert_eq!(state.get(&account3)?, Some(vec![0, 0, 0])); + let account23 = update_nibble(&account21, 2, 3); + for (k, v) in vec![(account21, vec![1, 1, 0]), (account23, vec![0, 0, 0])] { + state2.put(k, v); + } + assert_eq!(state2.get(&account21)?, Some(vec![1, 1, 0])); + assert_eq!(state2.get(&account22)?, Some(vec![0, 0, 0])); + assert_eq!(state2.get(&account23)?, Some(vec![0, 0, 0])); + let new_root_hash2 = state2.commit()?; + assert_eq!(state2.root_hash(), new_root_hash2); + assert_eq!(state2.get(&account11)?, Some(vec![1, 1, 0])); + assert_eq!(state2.get(&account12)?, Some(vec![0, 0, 0])); + assert_eq!(state2.get(&account13)?, Some(vec![0, 0, 0])); + let (root2, updates2) = state2.last_change_sets().unwrap(); + assert_eq!(updates2.num_stale_leaves, 1); + assert_eq!(updates2.num_new_leaves, 2); + assert_eq!(updates2.node_batch.len(), 5); + assert_eq!(updates2.stale_node_index_batch.len(), 2); + assert_eq!(root1, root2); + + // test delete account + state1.remove(&account11); + state1.commit()?; + state1.flush()?; + assert_eq!(state1.get(&account11)?, None); + + state2.remove(&account21); + state2.commit()?; + assert_eq!(state2.get(&account21)?, None); - let (_, updates) = state.change_sets(); - assert_eq!(updates.num_stale_leaves, 0); - assert_eq!(updates.num_new_leaves, 3); - assert_eq!(updates.node_batch.len(), 6); - assert_eq!(updates.stale_node_index_batch.len(), 1); Ok(()) } @@ -228,7 +307,7 @@ pub fn test_state_multi_commit_missing_node() -> Result<()> { let state = StateTree::new(Arc::new(storage.clone()), None); let hash_value1 = HashValueKey(HashValue::random()); let value1 = vec![1u8, 2u8]; - state.put(hash_value1, value1); + state.put(hash_value1, value1.clone()); state.commit()?; let root_hash1 = state.root_hash(); let hash_value2 = HashValueKey(HashValue::random()); @@ -240,8 +319,7 @@ pub fn test_state_multi_commit_missing_node() -> Result<()> { state.flush()?; let root_hash2 = state.root_hash(); let state1 = StateTree::new(Arc::new(storage.clone()), Some(root_hash1)); - let result = state1.get(&hash_value1); - assert!(result.is_err(), "Missing node at HashValue"); + assert_eq!(state1.get(&hash_value1)?, Some(value1)); let state2 = StateTree::new(Arc::new(storage), Some(root_hash2)); assert_eq!(state2.get(&hash_value1)?, Some(value12)); @@ -277,7 +355,16 @@ pub fn test_state_multi_commit_and_flush() -> Result<()> { assert_eq!(state1.get(&hash_value1)?, Some(value1)); let state2 = StateTree::new(Arc::new(storage), Some(root_hash2)); - assert_eq!(state2.get(&hash_value1)?, Some(value12)); + assert_eq!(state2.get(&hash_value1)?, Some(value12.clone())); assert_eq!(state2.get(&hash_value2)?, Some(value2)); + + state.remove(&hash_value1); + state.commit()?; + assert_eq!(state.get(&hash_value1)?, None); + state.flush()?; + assert_eq!(state2.get(&hash_value1)?, Some(value12)); + + let hash_value3 = HashValueKey(HashValue::random()); + assert_eq!(state.get(&hash_value3)?, None); Ok(()) }