Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat(core/binding): Implementation of service state. (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
yejiayu authored Dec 9, 2019
1 parent b6c8c71 commit c32186a
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 7 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions core/binding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
protocol = { path = "../../protocol" }

hasher = { version = "0.1", features = ['hash-keccak'] }
cita_trie = "2.0"
bytes = "0.4"
derive_more = "0.15"
rocksdb = "0.12"
lazy_static = "1.4"
9 changes: 3 additions & 6 deletions core/binding/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}
mod tests;

mod state;
135 changes: 135 additions & 0 deletions core/binding/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
mod trie;
mod trie_db;

pub use trie::{MPTTrie, MPTTrieError};
pub use trie_db::{RocksTrieDB, RocksTrieDBError};

use std::collections::HashMap;

use bytes::Bytes;
use cita_trie::DB as TrieDB;

use protocol::fixed_codec::FixedCodec;
use protocol::traits::ServiceState;
use protocol::types::{Address, Hash, MerkleRoot};
use protocol::ProtocolResult;

pub struct GeneralServiceState<DB: TrieDB> {
trie: MPTTrie<DB>,

// TODO(@yejiayu): The value of HashMap should be changed to Box<dyn Any> to avoid multiple
// serializations.
cache_map: HashMap<Bytes, Bytes>,
stash_map: HashMap<Bytes, Bytes>,
}

impl<DB: TrieDB> GeneralServiceState<DB> {
pub fn new(trie: MPTTrie<DB>) -> Self {
Self {
trie,

cache_map: HashMap::new(),
stash_map: HashMap::new(),
}
}
}

impl<DB: TrieDB> ServiceState for GeneralServiceState<DB> {
fn get<Key: FixedCodec, Ret: FixedCodec>(&self, key: &Key) -> ProtocolResult<Option<Ret>> {
let encoded_key = key.encode_fixed()?;

if let Some(value_bytes) = self.cache_map.get(&encoded_key) {
let inst = <_>::decode_fixed(value_bytes.clone())?;
return Ok(Some(inst));
}

if let Some(value_bytes) = self.stash_map.get(&encoded_key) {
let inst = <_>::decode_fixed(value_bytes.clone())?;
return Ok(Some(inst));
}

if let Some(value_bytes) = self.trie.get(&encoded_key)? {
return Ok(Some(<_>::decode_fixed(value_bytes)?));
}

Ok(None)
}

fn contains<Key: FixedCodec>(&self, key: &Key) -> ProtocolResult<bool> {
let encoded_key = key.encode_fixed()?;

if self.cache_map.contains_key(&encoded_key) {
return Ok(true);
};

if self.stash_map.contains_key(&encoded_key) {
return Ok(true);
};

self.trie.contains(&encoded_key)
}

// Insert a pair of key / value
// Note: This key/value pair will go into the cache first
// and will not be persisted to MPT until `commit` is called.
fn insert<Key: FixedCodec, Value: FixedCodec>(
&mut self,
key: Key,
value: Value,
) -> ProtocolResult<()> {
self.cache_map
.insert(key.encode_fixed()?, value.encode_fixed()?);
Ok(())
}

fn get_account_value<Key: FixedCodec, Ret: FixedCodec>(
&self,
address: &Address,
key: &Key,
) -> ProtocolResult<Option<Ret>> {
let hash_key = get_address_key(address, key)?;
self.get(&hash_key)
}

fn set_account_value<Key: FixedCodec, Val: FixedCodec>(
&mut self,
address: &Address,
key: Key,
val: Val,
) -> ProtocolResult<()> {
let hash_key = get_address_key(address, &key)?;
self.insert(hash_key, val)
}

// Roll back all data in the cache
fn revert_cache(&mut self) -> ProtocolResult<()> {
self.cache_map.clear();
Ok(())
}

// Move data from cache to stash
fn stash(&mut self) -> ProtocolResult<()> {
for (k, v) in self.cache_map.drain() {
self.stash_map.insert(k, v);
}

Ok(())
}

// Persist data from stash into MPT
fn commit(&mut self) -> ProtocolResult<MerkleRoot> {
for (key, value) in self.stash_map.drain() {
self.trie.insert(key, value)?;
}

let root = self.trie.commit()?;
Ok(root)
}
}

fn get_address_key<Key: FixedCodec>(address: &Address, key: &Key) -> ProtocolResult<Hash> {
let mut hash_bytes = address.as_bytes().to_vec();
hash_bytes.extend_from_slice(key.encode_fixed()?.as_ref());

Ok(Hash::digest(Bytes::from(hash_bytes)))
}
77 changes: 77 additions & 0 deletions core/binding/src/state/trie.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::sync::Arc;

use bytes::Bytes;
use cita_trie::{PatriciaTrie, Trie, TrieError, DB as TrieDB};
use derive_more::{Display, From};
use hasher::HasherKeccak;
use lazy_static::lazy_static;

use protocol::types::{Hash, MerkleRoot};
use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult};

lazy_static! {
static ref HASHER_INST: Arc<HasherKeccak> = Arc::new(HasherKeccak::new());
}

pub struct MPTTrie<DB: TrieDB> {
root: MerkleRoot,
trie: PatriciaTrie<DB, HasherKeccak>,
}

impl<DB: TrieDB> MPTTrie<DB> {
pub fn new(db: Arc<DB>) -> Self {
let trie = PatriciaTrie::new(db, Arc::clone(&HASHER_INST));

Self {
root: Hash::from_empty(),
trie,
}
}

pub fn from(root: MerkleRoot, db: Arc<DB>) -> ProtocolResult<Self> {
let trie = PatriciaTrie::from(db, Arc::clone(&HASHER_INST), &root.as_bytes())
.map_err(MPTTrieError::from)?;

Ok(Self { root, trie })
}

pub fn get(&self, key: &Bytes) -> ProtocolResult<Option<Bytes>> {
Ok(self
.trie
.get(key)
.map_err(MPTTrieError::from)?
.map(Bytes::from))
}

pub fn contains(&self, key: &Bytes) -> ProtocolResult<bool> {
Ok(self.trie.contains(key).map_err(MPTTrieError::from)?)
}

pub fn insert(&mut self, key: Bytes, value: Bytes) -> ProtocolResult<()> {
self.trie
.insert(key.to_vec(), value.to_vec())
.map_err(MPTTrieError::from)?;
Ok(())
}

pub fn commit(&mut self) -> ProtocolResult<MerkleRoot> {
let root_bytes = self.trie.root().map_err(MPTTrieError::from)?;
let root = MerkleRoot::from_bytes(Bytes::from(root_bytes))?;
self.root = root;
Ok(self.root.clone())
}
}

#[derive(Debug, Display, From)]
pub enum MPTTrieError {
#[display(fmt = "{:?}", _0)]
Trie(TrieError),
}

impl std::error::Error for MPTTrieError {}

impl From<MPTTrieError> for ProtocolError {
fn from(err: MPTTrieError) -> ProtocolError {
ProtocolError::new(ProtocolErrorKind::Binding, Box::new(err))
}
}
111 changes: 111 additions & 0 deletions core/binding/src/state/trie_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::path::Path;
use std::sync::Arc;

use bytes::Bytes;
use derive_more::{Display, From};
use rocksdb::{Options, WriteBatch, DB};

use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult};

pub struct RocksTrieDB {
light: bool,
db: Arc<DB>,
}

impl RocksTrieDB {
pub fn new<P: AsRef<Path>>(path: P, light: bool) -> ProtocolResult<Self> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);

let db = DB::open(&opts, path).map_err(RocksTrieDBError::from)?;

Ok(RocksTrieDB {
light,
db: Arc::new(db),
})
}
}

impl cita_trie::DB for RocksTrieDB {
type Error = RocksTrieDBError;

fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self
.db
.get(key)
.map_err(RocksTrieDBError::from)?
.map(|v| v.to_vec()))
}

fn contains(&self, key: &[u8]) -> Result<bool, Self::Error> {
Ok(self.db.get(key).map_err(RocksTrieDBError::from)?.is_some())
}

fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Self::Error> {
self.db
.put(Bytes::from(key), Bytes::from(value))
.map_err(RocksTrieDBError::from)?;
Ok(())
}

fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
if keys.len() != values.len() {
return Err(RocksTrieDBError::BatchLengthMismatch);
}

let mut batch = WriteBatch::default();
for i in 0..keys.len() {
let key = &keys[i];
let value = &values[i];
batch.put(key, value).map_err(RocksTrieDBError::from)?;
}

self.db.write(batch).map_err(RocksTrieDBError::from)?;
Ok(())
}

fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
if self.light {
self.db.delete(key).map_err(RocksTrieDBError::from)?;
}
Ok(())
}

fn remove_batch(&self, keys: &[Vec<u8>]) -> Result<(), Self::Error> {
if self.light {
let mut batch = WriteBatch::default();
for key in keys {
batch.delete(key).map_err(RocksTrieDBError::from)?;
}

self.db.write(batch).map_err(RocksTrieDBError::from)?;
}

Ok(())
}

fn flush(&self) -> Result<(), Self::Error> {
Ok(())
}
}

#[derive(Debug, Display, From)]
pub enum RocksTrieDBError {
#[display(fmt = "rocksdb {}", _0)]
RocksDB(rocksdb::Error),

#[display(fmt = "parameters do not match")]
InsertParameter,

#[display(fmt = "batch length dont match")]
BatchLengthMismatch,
}

impl std::error::Error for RocksTrieDBError {}

impl From<RocksTrieDBError> for ProtocolError {
fn from(err: RocksTrieDBError) -> ProtocolError {
ProtocolError::new(ProtocolErrorKind::Binding, Box::new(err))
}
}
1 change: 1 addition & 0 deletions core/binding/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod state;
Loading

0 comments on commit c32186a

Please sign in to comment.