From c32186a1c699e06fda19c993b5b14257b46054a3 Mon Sep 17 00:00:00 2001 From: Jiayu Ye Date: Mon, 9 Dec 2019 23:44:39 +0800 Subject: [PATCH] feat(core/binding): Implementation of service state. (#48) --- Cargo.lock | 9 ++ core/binding/Cargo.toml | 8 ++ core/binding/src/lib.rs | 9 +- core/binding/src/state/mod.rs | 135 ++++++++++++++++++++++++++++++ core/binding/src/state/trie.rs | 77 +++++++++++++++++ core/binding/src/state/trie_db.rs | 111 ++++++++++++++++++++++++ core/binding/src/tests/mod.rs | 1 + core/binding/src/tests/state.rs | 66 +++++++++++++++ protocol/src/lib.rs | 1 + protocol/src/traits/executor.rs | 34 +++++++- protocol/src/traits/mod.rs | 4 + 11 files changed, 448 insertions(+), 7 deletions(-) create mode 100644 core/binding/src/state/mod.rs create mode 100644 core/binding/src/state/trie.rs create mode 100644 core/binding/src/state/trie_db.rs create mode 100644 core/binding/src/tests/mod.rs create mode 100644 core/binding/src/tests/state.rs diff --git a/Cargo.lock b/Cargo.lock index f89f0b3ac..0aa276743 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,15 @@ dependencies = [ [[package]] name = "core-binding" version = "0.1.0" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "cita_trie 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hasher 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "protocol 0.1.0", + "rocksdb 0.12.4 (registry+https://github.com/rust-lang/crates.io-index)", +] [[package]] name = "core-binding-macro" diff --git a/core/binding/Cargo.toml b/core/binding/Cargo.toml index 898bd17ac..26bbdc443 100644 --- a/core/binding/Cargo.toml +++ b/core/binding/Cargo.toml @@ -7,3 +7,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +protocol = { path = "../../protocol" } + +hasher = { version = "0.1", features = ['hash-keccak'] } +cita_trie = "2.0" +bytes = "0.4" +derive_more = "0.15" +rocksdb = "0.12" +lazy_static = "1.4" diff --git a/core/binding/src/lib.rs b/core/binding/src/lib.rs index 31e1bb209..283cb9afd 100644 --- a/core/binding/src/lib.rs +++ b/core/binding/src/lib.rs @@ -1,7 +1,4 @@ #[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} +mod tests; + +mod state; diff --git a/core/binding/src/state/mod.rs b/core/binding/src/state/mod.rs new file mode 100644 index 000000000..99940c156 --- /dev/null +++ b/core/binding/src/state/mod.rs @@ -0,0 +1,135 @@ +mod trie; +mod trie_db; + +pub use trie::{MPTTrie, MPTTrieError}; +pub use trie_db::{RocksTrieDB, RocksTrieDBError}; + +use std::collections::HashMap; + +use bytes::Bytes; +use cita_trie::DB as TrieDB; + +use protocol::fixed_codec::FixedCodec; +use protocol::traits::ServiceState; +use protocol::types::{Address, Hash, MerkleRoot}; +use protocol::ProtocolResult; + +pub struct GeneralServiceState { + trie: MPTTrie, + + // TODO(@yejiayu): The value of HashMap should be changed to Box to avoid multiple + // serializations. + cache_map: HashMap, + stash_map: HashMap, +} + +impl GeneralServiceState { + pub fn new(trie: MPTTrie) -> Self { + Self { + trie, + + cache_map: HashMap::new(), + stash_map: HashMap::new(), + } + } +} + +impl ServiceState for GeneralServiceState { + fn get(&self, key: &Key) -> ProtocolResult> { + let encoded_key = key.encode_fixed()?; + + if let Some(value_bytes) = self.cache_map.get(&encoded_key) { + let inst = <_>::decode_fixed(value_bytes.clone())?; + return Ok(Some(inst)); + } + + if let Some(value_bytes) = self.stash_map.get(&encoded_key) { + let inst = <_>::decode_fixed(value_bytes.clone())?; + return Ok(Some(inst)); + } + + if let Some(value_bytes) = self.trie.get(&encoded_key)? { + return Ok(Some(<_>::decode_fixed(value_bytes)?)); + } + + Ok(None) + } + + fn contains(&self, key: &Key) -> ProtocolResult { + let encoded_key = key.encode_fixed()?; + + if self.cache_map.contains_key(&encoded_key) { + return Ok(true); + }; + + if self.stash_map.contains_key(&encoded_key) { + return Ok(true); + }; + + self.trie.contains(&encoded_key) + } + + // Insert a pair of key / value + // Note: This key/value pair will go into the cache first + // and will not be persisted to MPT until `commit` is called. + fn insert( + &mut self, + key: Key, + value: Value, + ) -> ProtocolResult<()> { + self.cache_map + .insert(key.encode_fixed()?, value.encode_fixed()?); + Ok(()) + } + + fn get_account_value( + &self, + address: &Address, + key: &Key, + ) -> ProtocolResult> { + let hash_key = get_address_key(address, key)?; + self.get(&hash_key) + } + + fn set_account_value( + &mut self, + address: &Address, + key: Key, + val: Val, + ) -> ProtocolResult<()> { + let hash_key = get_address_key(address, &key)?; + self.insert(hash_key, val) + } + + // Roll back all data in the cache + fn revert_cache(&mut self) -> ProtocolResult<()> { + self.cache_map.clear(); + Ok(()) + } + + // Move data from cache to stash + fn stash(&mut self) -> ProtocolResult<()> { + for (k, v) in self.cache_map.drain() { + self.stash_map.insert(k, v); + } + + Ok(()) + } + + // Persist data from stash into MPT + fn commit(&mut self) -> ProtocolResult { + for (key, value) in self.stash_map.drain() { + self.trie.insert(key, value)?; + } + + let root = self.trie.commit()?; + Ok(root) + } +} + +fn get_address_key(address: &Address, key: &Key) -> ProtocolResult { + let mut hash_bytes = address.as_bytes().to_vec(); + hash_bytes.extend_from_slice(key.encode_fixed()?.as_ref()); + + Ok(Hash::digest(Bytes::from(hash_bytes))) +} diff --git a/core/binding/src/state/trie.rs b/core/binding/src/state/trie.rs new file mode 100644 index 000000000..edf01ddef --- /dev/null +++ b/core/binding/src/state/trie.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use bytes::Bytes; +use cita_trie::{PatriciaTrie, Trie, TrieError, DB as TrieDB}; +use derive_more::{Display, From}; +use hasher::HasherKeccak; +use lazy_static::lazy_static; + +use protocol::types::{Hash, MerkleRoot}; +use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult}; + +lazy_static! { + static ref HASHER_INST: Arc = Arc::new(HasherKeccak::new()); +} + +pub struct MPTTrie { + root: MerkleRoot, + trie: PatriciaTrie, +} + +impl MPTTrie { + pub fn new(db: Arc) -> Self { + let trie = PatriciaTrie::new(db, Arc::clone(&HASHER_INST)); + + Self { + root: Hash::from_empty(), + trie, + } + } + + pub fn from(root: MerkleRoot, db: Arc) -> ProtocolResult { + let trie = PatriciaTrie::from(db, Arc::clone(&HASHER_INST), &root.as_bytes()) + .map_err(MPTTrieError::from)?; + + Ok(Self { root, trie }) + } + + pub fn get(&self, key: &Bytes) -> ProtocolResult> { + Ok(self + .trie + .get(key) + .map_err(MPTTrieError::from)? + .map(Bytes::from)) + } + + pub fn contains(&self, key: &Bytes) -> ProtocolResult { + Ok(self.trie.contains(key).map_err(MPTTrieError::from)?) + } + + pub fn insert(&mut self, key: Bytes, value: Bytes) -> ProtocolResult<()> { + self.trie + .insert(key.to_vec(), value.to_vec()) + .map_err(MPTTrieError::from)?; + Ok(()) + } + + pub fn commit(&mut self) -> ProtocolResult { + let root_bytes = self.trie.root().map_err(MPTTrieError::from)?; + let root = MerkleRoot::from_bytes(Bytes::from(root_bytes))?; + self.root = root; + Ok(self.root.clone()) + } +} + +#[derive(Debug, Display, From)] +pub enum MPTTrieError { + #[display(fmt = "{:?}", _0)] + Trie(TrieError), +} + +impl std::error::Error for MPTTrieError {} + +impl From for ProtocolError { + fn from(err: MPTTrieError) -> ProtocolError { + ProtocolError::new(ProtocolErrorKind::Binding, Box::new(err)) + } +} diff --git a/core/binding/src/state/trie_db.rs b/core/binding/src/state/trie_db.rs new file mode 100644 index 000000000..416711721 --- /dev/null +++ b/core/binding/src/state/trie_db.rs @@ -0,0 +1,111 @@ +use std::path::Path; +use std::sync::Arc; + +use bytes::Bytes; +use derive_more::{Display, From}; +use rocksdb::{Options, WriteBatch, DB}; + +use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult}; + +pub struct RocksTrieDB { + light: bool, + db: Arc, +} + +impl RocksTrieDB { + pub fn new>(path: P, light: bool) -> ProtocolResult { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let db = DB::open(&opts, path).map_err(RocksTrieDBError::from)?; + + Ok(RocksTrieDB { + light, + db: Arc::new(db), + }) + } +} + +impl cita_trie::DB for RocksTrieDB { + type Error = RocksTrieDBError; + + fn get(&self, key: &[u8]) -> Result>, Self::Error> { + Ok(self + .db + .get(key) + .map_err(RocksTrieDBError::from)? + .map(|v| v.to_vec())) + } + + fn contains(&self, key: &[u8]) -> Result { + Ok(self.db.get(key).map_err(RocksTrieDBError::from)?.is_some()) + } + + fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { + self.db + .put(Bytes::from(key), Bytes::from(value)) + .map_err(RocksTrieDBError::from)?; + Ok(()) + } + + fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { + if keys.len() != values.len() { + return Err(RocksTrieDBError::BatchLengthMismatch); + } + + let mut batch = WriteBatch::default(); + for i in 0..keys.len() { + let key = &keys[i]; + let value = &values[i]; + batch.put(key, value).map_err(RocksTrieDBError::from)?; + } + + self.db.write(batch).map_err(RocksTrieDBError::from)?; + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { + if self.light { + self.db.delete(key).map_err(RocksTrieDBError::from)?; + } + Ok(()) + } + + fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { + if self.light { + let mut batch = WriteBatch::default(); + for key in keys { + batch.delete(key).map_err(RocksTrieDBError::from)?; + } + + self.db.write(batch).map_err(RocksTrieDBError::from)?; + } + + Ok(()) + } + + fn flush(&self) -> Result<(), Self::Error> { + Ok(()) + } +} + +#[derive(Debug, Display, From)] +pub enum RocksTrieDBError { + #[display(fmt = "rocksdb {}", _0)] + RocksDB(rocksdb::Error), + + #[display(fmt = "parameters do not match")] + InsertParameter, + + #[display(fmt = "batch length dont match")] + BatchLengthMismatch, +} + +impl std::error::Error for RocksTrieDBError {} + +impl From for ProtocolError { + fn from(err: RocksTrieDBError) -> ProtocolError { + ProtocolError::new(ProtocolErrorKind::Binding, Box::new(err)) + } +} diff --git a/core/binding/src/tests/mod.rs b/core/binding/src/tests/mod.rs new file mode 100644 index 000000000..ceecf476d --- /dev/null +++ b/core/binding/src/tests/mod.rs @@ -0,0 +1 @@ +mod state; diff --git a/core/binding/src/tests/state.rs b/core/binding/src/tests/state.rs new file mode 100644 index 000000000..7be879596 --- /dev/null +++ b/core/binding/src/tests/state.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; + +use bytes::Bytes; +use cita_trie::MemoryDB; + +use protocol::traits::ServiceState; +use protocol::types::{Address, Hash, MerkleRoot}; + +use crate::state::{GeneralServiceState, MPTTrie}; + +#[test] +fn test_state_insert() { + let memdb = Arc::new(MemoryDB::new(false)); + let mut state = new_state(Arc::clone(&memdb), None); + + let key = Hash::digest(Bytes::from("key".to_owned())); + let value = Hash::digest(Bytes::from("value".to_owned())); + state.insert(key.clone(), value.clone()).unwrap(); + let val: Hash = state.get(&key).unwrap().unwrap(); + assert_eq!(val, value); + + state.stash().unwrap(); + let new_root = state.commit().unwrap(); + + let val: Hash = state.get(&key).unwrap().unwrap(); + assert_eq!(val, value); + + let new_state = new_state(Arc::clone(&memdb), Some(new_root)); + let val: Hash = new_state.get(&key).unwrap().unwrap(); + assert_eq!(val, value); +} + +#[test] +fn test_state_account() { + let memdb = Arc::new(MemoryDB::new(false)); + let mut state = new_state(Arc::clone(&memdb), None); + + let address = Address::from_hash(Hash::digest(Bytes::from("test-address"))).unwrap(); + let key = Hash::digest(Bytes::from("key".to_owned())); + let value = Hash::digest(Bytes::from("value".to_owned())); + + state + .set_account_value(&address, key.clone(), value.clone()) + .unwrap(); + let val: Hash = state.get_account_value(&address, &key).unwrap().unwrap(); + assert_eq!(val, value); + + state.stash().unwrap(); + let new_root = state.commit().unwrap(); + + let new_state = new_state(Arc::clone(&memdb), Some(new_root)); + let val: Hash = new_state + .get_account_value(&address, &key) + .unwrap() + .unwrap(); + assert_eq!(val, value); +} + +fn new_state(memdb: Arc, root: Option) -> GeneralServiceState { + let trie = match root { + Some(root) => MPTTrie::from(root, memdb).unwrap(), + None => MPTTrie::new(memdb), + }; + + GeneralServiceState::new(trie) +} diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 27855ad13..d02afcc2f 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -19,6 +19,7 @@ pub enum ProtocolErrorKind { Network, Storage, Runtime, + Binding, // codec Codec, diff --git a/protocol/src/traits/executor.rs b/protocol/src/traits/executor.rs index 4a3358855..59367dfbb 100644 --- a/protocol/src/traits/executor.rs +++ b/protocol/src/traits/executor.rs @@ -1,9 +1,41 @@ use crate::types::{Bloom, MerkleRoot, Receipt}; -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct ExecutorResp { pub receipts: Vec, pub all_cycles_used: u64, pub logs_bloom: Bloom, pub state_root: MerkleRoot, } +// #[derive(Debug, Clone)] +// pub struct ExecutorParams { +// state_root: MerkleRoot, +// epoch_id: u64, +// cycels_limit: u64, +// } +// +// pub trait Executor { +// fn exec( +// &mut self, +// params: &ExecutorParams, +// txs: &[SignedTransaction], +// ) -> ProtocolResult; +// +// fn read( +// &self, +// params: &ExecutorParams, +// caller: &Address, +// request: &TransactionRequest, +// ) -> ProtocolResult; +// } +// +// pub trait ExecutorAdapter { +// fn get_service_inst(&self, service_name: &str) -> +// ProtocolResult; +// +// fn revert_state(&mut self) -> ProtocolResult<()>; +// +// fn stash_state(&mut self) -> ProtocolResult<()>; +// +// fn commit_state(&mut self) -> ProtocolResult; +// } diff --git a/protocol/src/traits/mod.rs b/protocol/src/traits/mod.rs index dacd3d178..bd3553150 100644 --- a/protocol/src/traits/mod.rs +++ b/protocol/src/traits/mod.rs @@ -7,6 +7,10 @@ mod network; mod storage; pub use api::APIAdapter; +pub use binding::{ + AdmissionControl, ChainDB, RequestContext, Service, ServiceSDK, ServiceState, StoreArray, + StoreBool, StoreMap, StoreString, StoreUint64, +}; pub use consensus::{Consensus, ConsensusAdapter, CurrentConsensusStatus, MessageTarget, NodeInfo}; pub use executor::ExecutorResp; pub use mempool::{MemPool, MemPoolAdapter, MixedTxHashes};