Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge statedb flush #3323

Merged
merged 14 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion contrib-contracts/src/starcoin_merkle_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn test_starcoin_merkle() -> Result<()> {
}

{
// change to previout state root.
// change to previous state root.
let old_chain_state = chain_state.fork_at(state_root);
// let state_root = chain_state.state_root();
let _expected_root = MoveValue::vector_u8(state_root.to_vec());
Expand Down
5 changes: 0 additions & 5 deletions executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ pub fn block_execute(
let txn_state_root = chain_state
.commit()
.map_err(BlockExecutorError::BlockChainStateErr)?;
//every transaction's state tree root and tree nodes should save to storage
//TODO merge database flush.
chain_state
.flush()
.map_err(BlockExecutorError::BlockChainStateErr)?;

executed_data.txn_infos.push(TransactionInfo::new(
txn_hash,
Expand Down
1 change: 1 addition & 0 deletions state/state-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ serde = { version = "1.0.130" }
forkable-jellyfish-merkle = { path = "../../commons/forkable-jellyfish-merkle"}
starcoin-state-store-api = {path = "../state-store-api"}
bcs-ext = { package="bcs-ext", path = "../../commons/bcs_ext" }
logger = { path = "../../commons/logger", package="starcoin-logger"}

[dev-dependencies]
starcoin-config= { path = "../../config"}
Expand Down
60 changes: 45 additions & 15 deletions state/state-tree/src/state_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use forkable_jellyfish_merkle::proof::SparseMerkleProof;
use forkable_jellyfish_merkle::{
JellyfishMerkleTree, RawKey, StaleNodeIndex, TreeReader, TreeUpdateBatch,
};
use logger::prelude::debug;
use parking_lot::{Mutex, RwLock};
use starcoin_crypto::hash::*;
use starcoin_state_store_api::*;
Expand All @@ -23,7 +24,8 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct StateCache<K: RawKey> {
root_hash: HashValue,
change_set: TreeUpdateBatch<K>,
change_set_list: Vec<(HashValue, TreeUpdateBatch<K>)>,
split_off_idx: Option<usize>,
}

impl<K> StateCache<K>
Expand All @@ -33,17 +35,22 @@ where
pub fn new(initial_root: HashValue) -> Self {
Self {
root_hash: initial_root,
change_set: TreeUpdateBatch::default(),
change_set_list: Vec::new(),
split_off_idx: None,
}
}

fn reset(&mut self, root_hash: HashValue) {
self.root_hash = root_hash;
self.change_set = TreeUpdateBatch::default();
self.change_set_list = if let Some(split_idx) = self.split_off_idx {
self.change_set_list.split_off(split_idx)
} else {
Vec::new()
};
}

fn add_changeset(&mut self, root_hash: HashValue, cs: TreeUpdateBatch<K>) {
let cur_change_set = &mut self.change_set;
let mut cur_change_set = TreeUpdateBatch::default();
let mut cs_num_stale_leaves = cs.num_stale_leaves;
for stale_node in cs.stale_node_index_batch.iter() {
match cur_change_set.node_batch.remove(&stale_node.node_key) {
Expand All @@ -70,7 +77,7 @@ where
cur_change_set.num_new_leaves += 1;
}
}

self.change_set_list.push((root_hash, cur_change_set));
self.root_hash = root_hash;
}
}
Expand Down Expand Up @@ -196,11 +203,25 @@ where

/// commit the state change into underline storage.
pub fn flush(&self) -> Result<()> {
let (root_hash, change_sets) = self.change_sets();
let change_set_list = {
let mut cache_guard = self.cache.lock();
cache_guard.split_off_idx = Some(cache_guard.change_set_list.len());
cache_guard.change_set_list.clone()
};

debug!("change_set_list len {}", change_set_list.len());
// when self::commit call self::updates(&self, updates: Vec<(K, Option<Blob>)>)
// the param updates is empty cause this situation
if change_set_list.is_empty() {
return Ok(());
}
let mut root_hash = HashValue::default();
let mut node_map = BTreeMap::new();
for (nk, n) in change_sets.node_batch.into_iter() {
node_map.insert(nk, n.try_into()?);
for (hash, change_sets) in change_set_list.into_iter() {
for (nk, n) in change_sets.node_batch.into_iter() {
node_map.insert(nk, n.try_into()?);
}
root_hash = hash;
}
self.storage.write_nodes(node_map)?;
// and then advance the storage root hash
Expand Down Expand Up @@ -259,9 +280,6 @@ where
};
let tree = JellyfishMerkleTree::new(&reader);
let (new_state_root, change_set) = tree.updates(Some(cur_root_hash), updates)?;
// cache.root_hashes.push(new_state_root);
// cache.change_sets.push(change_set);
// cache.root_hash = new_state_root;
cache.add_changeset(new_state_root, change_set);
Ok(new_state_root)
}
Expand Down Expand Up @@ -296,10 +314,18 @@ where
// }

/// get all changes so far based on initial root_hash.
/*
pub fn change_sets(&self) -> (HashValue, TreeUpdateBatch<K>) {
let cache_guard = self.cache.lock();
(cache_guard.root_hash, cache_guard.change_set.clone())
nkysg marked this conversation as resolved.
Show resolved Hide resolved
} */

/// get last changes root_hash
pub fn last_change_sets(&self) -> Option<(HashValue, TreeUpdateBatch<K>)> {
let cache_gurad = self.cache.lock();
cache_gurad.change_set_list.last().cloned()
}

// TODO: to keep atomic with other commit.
// TODO: think about the WriteBatch trait position.
// pub fn save<T>(&self, batch: &mut T) -> Result<()>
Expand Down Expand Up @@ -327,8 +353,10 @@ where
if node_key == &*SPARSE_MERKLE_PLACEHOLDER_HASH {
return Ok(Some(Node::new_null()));
}
if let Some(n) = self.cache.change_set.node_batch.get(node_key).cloned() {
return Ok(Some(n));
for change_set in self.cache.change_set_list.iter().rev() {
if let Some(n) = change_set.1.node_batch.get(node_key).cloned() {
nkysg marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Some(n));
}
}
match self.store.get(node_key) {
Ok(Some(n)) => Ok(Some(n.try_into()?)),
Expand All @@ -350,8 +378,10 @@ where
if node_key == &*SPARSE_MERKLE_PLACEHOLDER_HASH {
return Ok(Some(Node::new_null()));
}
if let Some(n) = self.cache.change_set.node_batch.get(node_key).cloned() {
return Ok(Some(n));
for change_set in self.cache.change_set_list.iter().rev() {
nkysg marked this conversation as resolved.
Show resolved Hide resolved
if let Some(n) = change_set.1.node_batch.get(node_key).cloned() {
return Ok(Some(n));
}
}
match self.store.get(node_key) {
Ok(Some(n)) => Ok(Some(n.try_into()?)),
Expand Down
Loading