Skip to content

Commit

Permalink
Merge statedb flush (#3323)
Browse files Browse the repository at this point in the history
* merge statedb flush

* test_starcoin_merkle add ignore

* add dea_code for test

* fix StateCache reset error

* fix bug

* revert starcoin_merkle_test ignore

* fix fmt

* update get

* remove unused code

* remove fn change_sets

* fix fmt

* add statetree  test

1. add compare continue commit and batch flush test
2. add remove test

* fix clippy

* add test
  • Loading branch information
nkysg authored Apr 14, 2022
1 parent 337c150 commit ab7eb86
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 85 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion contrib-contracts/src/starcoin_merkle_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn test_starcoin_merkle() -> Result<()> {
}

{
// change to previout state root.
// change to previous state root.
let old_chain_state = chain_state.fork_at(state_root);
// let state_root = chain_state.state_root();
let _expected_root = MoveValue::vector_u8(state_root.to_vec());
Expand Down
5 changes: 0 additions & 5 deletions executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ pub fn block_execute(
let txn_state_root = chain_state
.commit()
.map_err(BlockExecutorError::BlockChainStateErr)?;
//every transaction's state tree root and tree nodes should save to storage
//TODO merge database flush.
chain_state
.flush()
.map_err(BlockExecutorError::BlockChainStateErr)?;

executed_data.txn_infos.push(TransactionInfo::new(
txn_hash,
Expand Down
1 change: 1 addition & 0 deletions state/state-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ serde = { version = "1.0.130" }
forkable-jellyfish-merkle = { path = "../../commons/forkable-jellyfish-merkle"}
starcoin-state-store-api = {path = "../state-store-api"}
bcs-ext = { package="bcs-ext", path = "../../commons/bcs_ext" }
logger = { path = "../../commons/logger", package="starcoin-logger"}

[dev-dependencies]
starcoin-config= { path = "../../config"}
Expand Down
60 changes: 45 additions & 15 deletions state/state-tree/src/state_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use forkable_jellyfish_merkle::proof::SparseMerkleProof;
use forkable_jellyfish_merkle::{
JellyfishMerkleTree, RawKey, StaleNodeIndex, TreeReader, TreeUpdateBatch,
};
use logger::prelude::debug;
use parking_lot::{Mutex, RwLock};
use starcoin_crypto::hash::*;
use starcoin_state_store_api::*;
Expand All @@ -23,7 +24,8 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct StateCache<K: RawKey> {
root_hash: HashValue,
change_set: TreeUpdateBatch<K>,
change_set_list: Vec<(HashValue, TreeUpdateBatch<K>)>,
split_off_idx: Option<usize>,
}

impl<K> StateCache<K>
Expand All @@ -33,17 +35,22 @@ where
pub fn new(initial_root: HashValue) -> Self {
Self {
root_hash: initial_root,
change_set: TreeUpdateBatch::default(),
change_set_list: Vec::new(),
split_off_idx: None,
}
}

fn reset(&mut self, root_hash: HashValue) {
self.root_hash = root_hash;
self.change_set = TreeUpdateBatch::default();
self.change_set_list = if let Some(split_idx) = self.split_off_idx {
self.change_set_list.split_off(split_idx)
} else {
Vec::new()
};
}

fn add_changeset(&mut self, root_hash: HashValue, cs: TreeUpdateBatch<K>) {
let cur_change_set = &mut self.change_set;
let mut cur_change_set = TreeUpdateBatch::default();
let mut cs_num_stale_leaves = cs.num_stale_leaves;
for stale_node in cs.stale_node_index_batch.iter() {
match cur_change_set.node_batch.remove(&stale_node.node_key) {
Expand All @@ -70,7 +77,7 @@ where
cur_change_set.num_new_leaves += 1;
}
}

self.change_set_list.push((root_hash, cur_change_set));
self.root_hash = root_hash;
}
}
Expand Down Expand Up @@ -196,11 +203,25 @@ where

/// commit the state change into underline storage.
pub fn flush(&self) -> Result<()> {
let (root_hash, change_sets) = self.change_sets();
let change_set_list = {
let mut cache_guard = self.cache.lock();
cache_guard.split_off_idx = Some(cache_guard.change_set_list.len());
cache_guard.change_set_list.clone()
};

debug!("change_set_list len {}", change_set_list.len());
// when self::commit call self::updates(&self, updates: Vec<(K, Option<Blob>)>)
// the param updates is empty cause this situation
if change_set_list.is_empty() {
return Ok(());
}
let mut root_hash = HashValue::default();
let mut node_map = BTreeMap::new();
for (nk, n) in change_sets.node_batch.into_iter() {
node_map.insert(nk, n.try_into()?);
for (hash, change_sets) in change_set_list.into_iter() {
for (nk, n) in change_sets.node_batch.into_iter() {
node_map.insert(nk, n.try_into()?);
}
root_hash = hash;
}
self.storage.write_nodes(node_map)?;
// and then advance the storage root hash
Expand Down Expand Up @@ -259,9 +280,6 @@ where
};
let tree = JellyfishMerkleTree::new(&reader);
let (new_state_root, change_set) = tree.updates(Some(cur_root_hash), updates)?;
// cache.root_hashes.push(new_state_root);
// cache.change_sets.push(change_set);
// cache.root_hash = new_state_root;
cache.add_changeset(new_state_root, change_set);
Ok(new_state_root)
}
Expand Down Expand Up @@ -296,10 +314,18 @@ where
// }

/// get all changes so far based on initial root_hash.
/*
pub fn change_sets(&self) -> (HashValue, TreeUpdateBatch<K>) {
let cache_guard = self.cache.lock();
(cache_guard.root_hash, cache_guard.change_set.clone())
} */

/// get last changes root_hash
pub fn last_change_sets(&self) -> Option<(HashValue, TreeUpdateBatch<K>)> {
let cache_gurad = self.cache.lock();
cache_gurad.change_set_list.last().cloned()
}

// TODO: to keep atomic with other commit.
// TODO: think about the WriteBatch trait position.
// pub fn save<T>(&self, batch: &mut T) -> Result<()>
Expand Down Expand Up @@ -327,8 +353,10 @@ where
if node_key == &*SPARSE_MERKLE_PLACEHOLDER_HASH {
return Ok(Some(Node::new_null()));
}
if let Some(n) = self.cache.change_set.node_batch.get(node_key).cloned() {
return Ok(Some(n));
for change_set in self.cache.change_set_list.iter().rev() {
if let Some(n) = change_set.1.node_batch.get(node_key).cloned() {
return Ok(Some(n));
}
}
match self.store.get(node_key) {
Ok(Some(n)) => Ok(Some(n.try_into()?)),
Expand All @@ -350,8 +378,10 @@ where
if node_key == &*SPARSE_MERKLE_PLACEHOLDER_HASH {
return Ok(Some(Node::new_null()));
}
if let Some(n) = self.cache.change_set.node_batch.get(node_key).cloned() {
return Ok(Some(n));
for change_set in self.cache.change_set_list.iter().rev() {
if let Some(n) = change_set.1.node_batch.get(node_key).cloned() {
return Ok(Some(n));
}
}
match self.store.get(node_key) {
Ok(Some(n)) => Ok(Some(n.try_into()?)),
Expand Down
Loading

0 comments on commit ab7eb86

Please sign in to comment.