From 75eb0b15e58212699aac764882acd0a1e873a5b4 Mon Sep 17 00:00:00 2001 From: Jiayu Ye Date: Tue, 6 Aug 2019 18:39:09 +0800 Subject: [PATCH] Update README.md --- .gitignore | 3 + Cargo.toml | 8 +- core/storage/Cargo.toml | 6 + core/storage/src/adapter/mod.rs | 1 + core/storage/src/adapter/rocks.rs | 186 ++++++++++++++++++ core/storage/src/lib.rs | 50 ++++- {core/database => protocol}/Cargo.toml | 6 +- protocol/codec/Cargo.toml | 9 - protocol/codec/src/lib.rs | 7 - .../src/lib.rs => protocol/src/codec/mod.rs | 0 protocol/src/lib.rs | 54 +++++ protocol/src/traits/executor.rs | 15 ++ protocol/src/traits/mod.rs | 5 + protocol/src/traits/storage.rs | 48 +++++ protocol/src/types/mod.rs | 41 ++++ protocol/src/types/primitive.rs | 66 +++++++ protocol/src/types/transaction.rs | 56 ++++++ protocol/traits/Cargo.toml | 9 - protocol/traits/src/lib.rs | 7 - protocol/types/Cargo.toml | 9 - protocol/types/src/lib.rs | 7 - src/main.rs | 22 +-- 22 files changed, 546 insertions(+), 69 deletions(-) create mode 100644 core/storage/src/adapter/mod.rs create mode 100644 core/storage/src/adapter/rocks.rs rename {core/database => protocol}/Cargo.toml (73%) delete mode 100644 protocol/codec/Cargo.toml delete mode 100644 protocol/codec/src/lib.rs rename core/database/src/lib.rs => protocol/src/codec/mod.rs (100%) create mode 100644 protocol/src/lib.rs create mode 100644 protocol/src/traits/executor.rs create mode 100644 protocol/src/traits/mod.rs create mode 100644 protocol/src/traits/storage.rs create mode 100644 protocol/src/types/mod.rs create mode 100644 protocol/src/types/primitive.rs create mode 100644 protocol/src/types/transaction.rs delete mode 100644 protocol/traits/Cargo.toml delete mode 100644 protocol/traits/src/lib.rs delete mode 100644 protocol/types/Cargo.toml delete mode 100644 protocol/types/src/lib.rs diff --git a/.gitignore b/.gitignore index 7c350cb44..4c488293c 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ Cargo.lock # IDE .idea/ .vscode/ + +# dev +devtools/data diff --git a/Cargo.toml b/Cargo.toml index 7335ea300..b96d2f31d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,9 @@ authors = ["Cryptape Technologies "] edition = "2018" [dependencies] +protocol = { path = "./protocol" } +core-storage = { path = "./core/storage" } + runtime = "0.3.0-alpha.6" [workspace] @@ -20,13 +23,10 @@ members = [ "core/api", "core/bank", "core/consensus", - "core/database", "core/executor", "core/mempool", "core/network", "core/storage", - "protocol/codec", - "protocol/traits", - "protocol/types", + "protocol", ] diff --git a/core/storage/Cargo.toml b/core/storage/Cargo.toml index 5df7efc41..a406310f0 100644 --- a/core/storage/Cargo.toml +++ b/core/storage/Cargo.toml @@ -7,3 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +protocol = { path = "../../protocol" } + +bytes = "0.4" +parking_lot = "0.9" +async-trait = "0.1" +rocksdb = "0.12" diff --git a/core/storage/src/adapter/mod.rs b/core/storage/src/adapter/mod.rs new file mode 100644 index 000000000..9f2fce510 --- /dev/null +++ b/core/storage/src/adapter/mod.rs @@ -0,0 +1 @@ +pub mod rocks; diff --git a/core/storage/src/adapter/rocks.rs b/core/storage/src/adapter/rocks.rs new file mode 100644 index 000000000..cbd3fb614 --- /dev/null +++ b/core/storage/src/adapter/rocks.rs @@ -0,0 +1,186 @@ +use std::error::Error; +use std::fmt; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use rocksdb::{ColumnFamily, Options, WriteBatch, DB}; + +use protocol::traits::{StorageAdapter, StorageCategory}; +use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult}; + +#[derive(Debug)] +pub struct RocksAdapter { + db: Arc, +} + +impl RocksAdapter { + pub fn new(path: String) -> ProtocolResult { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let categories = [ + map_category(StorageCategory::Epoch), + map_category(StorageCategory::Receipt), + map_category(StorageCategory::SignedTransaction), + ]; + + let db = DB::open_cf(&opts, path, categories.iter()).map_err(RocksAdapterError::from)?; + + Ok(RocksAdapter { db: Arc::new(db) }) + } +} + +#[async_trait] +impl StorageAdapter for RocksAdapter { + async fn get(&self, c: StorageCategory, key: Bytes) -> ProtocolResult> { + let column = get_column(&self.db, c)?; + let v = self + .db + .get_cf(column, key) + .map_err(RocksAdapterError::from)?; + + Ok(v.map(|v| Bytes::from(v.to_vec()))) + } + + async fn get_batch( + &self, + c: StorageCategory, + keys: Vec, + ) -> ProtocolResult>> { + let column = get_column(&self.db, c)?; + + let mut values = Vec::with_capacity(keys.len()); + for key in keys { + let v = self + .db + .get_cf(column, key) + .map_err(RocksAdapterError::from)?; + + values.push(v.map(|v| Bytes::from(v.to_vec()))); + } + + Ok(values) + } + + async fn insert(&self, c: StorageCategory, key: Bytes, value: Bytes) -> ProtocolResult<()> { + let column = get_column(&self.db, c)?; + self.db + .put_cf(column, key.to_vec(), value.to_vec()) + .map_err(RocksAdapterError::from)?; + Ok(()) + } + + async fn insert_batch( + &self, + c: StorageCategory, + keys: Vec, + values: Vec, + ) -> ProtocolResult<()> { + if keys.len() != values.len() { + return Err(RocksAdapterError::InsertParameter.into()); + } + + let column = get_column(&self.db, c)?; + + let mut batch = WriteBatch::default(); + for (key, value) in keys.into_iter().zip(values.into_iter()) { + batch + .put_cf(column, key, value) + .map_err(RocksAdapterError::from)?; + } + + self.db.write(batch).map_err(RocksAdapterError::from)?; + Ok(()) + } + + async fn contains(&self, c: StorageCategory, key: Bytes) -> ProtocolResult { + let column = get_column(&self.db, c)?; + let v = self + .db + .get_cf(column, key) + .map_err(RocksAdapterError::from)?; + + Ok(v.is_some()) + } + + async fn remove(&self, c: StorageCategory, key: Bytes) -> ProtocolResult<()> { + let column = get_column(&self.db, c)?; + self.db + .delete_cf(column, key) + .map_err(RocksAdapterError::from)?; + Ok(()) + } + + async fn remove_batch(&self, c: StorageCategory, keys: Vec) -> ProtocolResult<()> { + let column = get_column(&self.db, c)?; + + let mut batch = WriteBatch::default(); + for key in keys { + batch + .delete_cf(column, key) + .map_err(RocksAdapterError::from)?; + } + + self.db.write(batch).map_err(RocksAdapterError::from)?; + Ok(()) + } +} + +const C_EPOCHS: &str = "c1"; +const C_SIGNED_TRANSACTIONS: &str = "c2"; +const C_RECEIPTS: &str = "c3"; + +fn map_category(c: StorageCategory) -> &'static str { + match c { + StorageCategory::Epoch => C_EPOCHS, + StorageCategory::Receipt => C_RECEIPTS, + StorageCategory::SignedTransaction => C_SIGNED_TRANSACTIONS, + } +} + +fn get_column(db: &DB, c: StorageCategory) -> Result { + let column = db + .cf_handle(map_category(c)) + .ok_or_else(|| RocksAdapterError::CategoryNotFound { c })?; + Ok(column) +} + +#[derive(Debug)] +pub enum RocksAdapterError { + CategoryNotFound { c: StorageCategory }, + RocksDB { error: rocksdb::Error }, + InsertParameter, +} + +impl Error for RocksAdapterError {} + +impl fmt::Display for RocksAdapterError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let printable = match self { + RocksAdapterError::CategoryNotFound { c } => format!("category {:?} not found", c), + RocksAdapterError::RocksDB { error } => format!("rocksdb {:?}", error), + RocksAdapterError::InsertParameter => "parameters do not match".to_owned(), + }; + write!(f, "{}", printable) + } +} + +impl From for ProtocolError { + fn from(err: RocksAdapterError) -> ProtocolError { + ProtocolError::new(ProtocolErrorKind::Storage, Box::new(err)) + } +} + +impl From for RocksAdapterError { + fn from(error: rocksdb::Error) -> Self { + RocksAdapterError::RocksDB { error } + } +} + +impl From for RocksAdapterError { + fn from(c: StorageCategory) -> Self { + RocksAdapterError::CategoryNotFound { c } + } +} diff --git a/core/storage/src/lib.rs b/core/storage/src/lib.rs index 31e1bb209..57bb1ab72 100644 --- a/core/storage/src/lib.rs +++ b/core/storage/src/lib.rs @@ -1,7 +1,47 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); +// Remove this clippy bug with async await is resolved. +// ISSUE: https://github.com/rust-lang/rust-clippy/issues/3988 +#![allow(clippy::needless_lifetimes)] +#![feature(async_await)] + +pub mod adapter; + +use std::sync::Arc; + +use async_trait::async_trait; + +use protocol::traits::{Storage, StorageAdapter, StorageCategory}; +use protocol::types::{Hash, SignedTransaction}; +use protocol::ProtocolResult; + +#[derive(Debug)] +pub struct ImplStorage { + adapter: Arc, +} + +impl ImplStorage { + pub fn new(adapter: Arc) -> Self { + Self { adapter } + } +} + +#[async_trait] +impl Storage for ImplStorage { + async fn insert_transactions(&self, _signed_txs: Vec) -> ProtocolResult<()> { + self.adapter + .insert_batch(StorageCategory::SignedTransaction, vec![], vec![]) + .await?; + Ok(()) + } + + async fn get_transaction_by_hash( + &self, + _tx_hash: Hash, + ) -> ProtocolResult> { + unimplemented!(); + // let adapter = Arc::clone(&self.adapter); + // + // async move { + // adapter.get(tx_hash.as_bytes()).await.unwrap(); + // } } } diff --git a/core/database/Cargo.toml b/protocol/Cargo.toml similarity index 73% rename from core/database/Cargo.toml rename to protocol/Cargo.toml index 261cef473..264c70f21 100644 --- a/core/database/Cargo.toml +++ b/protocol/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "core-database" +name = "protocol" version = "0.1.0" authors = ["Cryptape Technologies "] edition = "2018" @@ -7,3 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" +bytes = "0.4" +uint = "0.8" +hex = "0.3" diff --git a/protocol/codec/Cargo.toml b/protocol/codec/Cargo.toml deleted file mode 100644 index 7d362cfb7..000000000 --- a/protocol/codec/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "protocol-codec" -version = "0.1.0" -authors = ["Cryptape Technologies "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/protocol/codec/src/lib.rs b/protocol/codec/src/lib.rs deleted file mode 100644 index 31e1bb209..000000000 --- a/protocol/codec/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/core/database/src/lib.rs b/protocol/src/codec/mod.rs similarity index 100% rename from core/database/src/lib.rs rename to protocol/src/codec/mod.rs diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs new file mode 100644 index 000000000..0ed1ac157 --- /dev/null +++ b/protocol/src/lib.rs @@ -0,0 +1,54 @@ +#[macro_use] +extern crate uint; + +pub mod codec; +pub mod traits; +pub mod types; + +use std::error::Error; +use std::fmt; + +#[derive(Debug, Clone)] +pub enum ProtocolErrorKind { + // traits + API, + Bank, + Consensus, + Executor, + Mempool, + Network, + Storage, + + // codec + Codec, + + // types + Types, +} + +// refer to https://github.com/rust-lang/rust/blob/a17951c4f80eb5208030f91fdb4ae93919fa6b12/src/libstd/io/error.rs#L73 +#[derive(Debug)] +pub struct ProtocolError { + kind: ProtocolErrorKind, + error: Box, +} + +impl ProtocolError { + pub fn new(kind: ProtocolErrorKind, error: Box) -> Self { + Self { kind, error } + } +} + +impl Error for ProtocolError {} + +impl fmt::Display for ProtocolError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "[ProtocolError] Kind: {:?} Error: {:?}", + self.kind, self.error + ) + } +} + +pub type ProtocolResult = Result; diff --git a/protocol/src/traits/executor.rs b/protocol/src/traits/executor.rs new file mode 100644 index 000000000..a912314fc --- /dev/null +++ b/protocol/src/traits/executor.rs @@ -0,0 +1,15 @@ +use async_trait::async_trait; + +use crate::ProtocolResult; + +#[async_trait] +pub trait Executor: Send + Sync { + type Adapter: ExecutorAdapter; + + fn exec(&self) -> ProtocolResult<()>; +} + +#[async_trait] +pub trait ExecutorAdapter: Send + Sync { + fn get_epoch_header(&self) -> ProtocolResult<()>; +} diff --git a/protocol/src/traits/mod.rs b/protocol/src/traits/mod.rs new file mode 100644 index 000000000..085290179 --- /dev/null +++ b/protocol/src/traits/mod.rs @@ -0,0 +1,5 @@ +mod executor; +mod storage; + +pub use executor::{Executor, ExecutorAdapter}; +pub use storage::{Storage, StorageAdapter, StorageCategory}; diff --git a/protocol/src/traits/storage.rs b/protocol/src/traits/storage.rs new file mode 100644 index 000000000..da6544c71 --- /dev/null +++ b/protocol/src/traits/storage.rs @@ -0,0 +1,48 @@ +use async_trait::async_trait; +use bytes::Bytes; + +use crate::types::{Hash, SignedTransaction}; +use crate::ProtocolResult; + +#[derive(Debug, Copy, Clone)] +pub enum StorageCategory { + Epoch, + Receipt, + SignedTransaction, +} + +#[async_trait] +pub trait Storage: Send + Sync { + async fn insert_transactions(&self, signed_txs: Vec) -> ProtocolResult<()>; + + async fn get_transaction_by_hash( + &self, + tx_hash: Hash, + ) -> ProtocolResult>; +} + +#[async_trait] +pub trait StorageAdapter: Send + Sync { + async fn get(&self, c: StorageCategory, key: Bytes) -> ProtocolResult>; + + async fn get_batch( + &self, + c: StorageCategory, + keys: Vec, + ) -> ProtocolResult>>; + + async fn insert(&self, c: StorageCategory, key: Bytes, value: Bytes) -> ProtocolResult<()>; + + async fn insert_batch( + &self, + c: StorageCategory, + keys: Vec, + values: Vec, + ) -> ProtocolResult<()>; + + async fn contains(&self, c: StorageCategory, key: Bytes) -> ProtocolResult; + + async fn remove(&self, c: StorageCategory, key: Bytes) -> ProtocolResult<()>; + + async fn remove_batch(&self, c: StorageCategory, keys: Vec) -> ProtocolResult<()>; +} diff --git a/protocol/src/types/mod.rs b/protocol/src/types/mod.rs new file mode 100644 index 000000000..ab812c6d6 --- /dev/null +++ b/protocol/src/types/mod.rs @@ -0,0 +1,41 @@ +mod primitive; +mod transaction; + +use std::error::Error; +use std::fmt; + +use crate::{ProtocolError, ProtocolErrorKind}; + +pub use primitive::{Address, Hash}; +pub use transaction::{ContractType, Fee, RawTransaction, SignedTransaction, TransactionAction}; + +#[derive(Debug)] +pub enum TypesError { + HashLengthMismatch { expect: usize, real: usize }, + FromHex { error: hex::FromHexError }, +} + +impl Error for TypesError {} + +impl fmt::Display for TypesError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let printable = match self { + TypesError::HashLengthMismatch { expect, real } => { + format!("Expect {:?} to get {:?}.", expect, real) + } + TypesError::FromHex { error } => format!("{:?}.", error), + }; + write!(f, "{}", printable) + } +} +impl From for ProtocolError { + fn from(error: TypesError) -> ProtocolError { + ProtocolError::new(ProtocolErrorKind::Types, Box::new(error)) + } +} + +impl From for TypesError { + fn from(error: hex::FromHexError) -> Self { + TypesError::FromHex { error } + } +} diff --git a/protocol/src/types/primitive.rs b/protocol/src/types/primitive.rs new file mode 100644 index 000000000..36e34d434 --- /dev/null +++ b/protocol/src/types/primitive.rs @@ -0,0 +1,66 @@ +#![allow(clippy::all)] +construct_uint! { + pub struct Balance(4); +} + +use bytes::Bytes; + +use crate::types::TypesError; +use crate::ProtocolResult; + +const HASH_LEN: usize = 32; +const ADDRESS_LEN: usize = 21; + +#[derive(Clone, Debug)] +pub struct Hash([u8; HASH_LEN]); +#[derive(Clone, Debug)] +pub struct Address([u8; ADDRESS_LEN]); + +impl Hash { + pub fn from_bytes(bytes: Bytes) -> ProtocolResult { + if bytes.len() != ADDRESS_LEN { + return Err(TypesError::HashLengthMismatch { + expect: HASH_LEN, + real: bytes.len(), + } + .into()); + } + + let mut out = [0u8; HASH_LEN]; + out.copy_from_slice(&bytes); + Ok(Self::from_fixed_bytes(out)) + } + + pub fn from_fixed_bytes(bytes: [u8; HASH_LEN]) -> Self { + Self(bytes) + } + + pub fn from_hex(s: &str) -> ProtocolResult { + let s = clean_0x(s); + let bytes = hex::decode(s).map_err(TypesError::from)?; + + let mut out = [0u8; HASH_LEN]; + out.copy_from_slice(&bytes); + Ok(Self::from_fixed_bytes(out)) + } + + pub fn as_bytes(&self) -> Bytes { + Bytes::from(self.0.as_ref()) + } + + pub fn into_fixed_bytes(self) -> [u8; HASH_LEN] { + self.0 + } + + pub fn as_hex(&self) -> String { + hex::encode(self.0) + } +} + +pub fn clean_0x(s: &str) -> &str { + if s.starts_with("0x") { + &s[2..] + } else { + s + } +} diff --git a/protocol/src/types/transaction.rs b/protocol/src/types/transaction.rs new file mode 100644 index 000000000..8e08a549e --- /dev/null +++ b/protocol/src/types/transaction.rs @@ -0,0 +1,56 @@ +use crate::types::primitive::{Address, Balance, Hash}; + +#[derive(Clone, Debug)] +pub struct RawTransaction { + pub chain_id: Hash, + pub nonce: Hash, + pub timeout: u64, + pub fee: Fee, + pub action: TransactionAction, +} + +#[derive(Clone, Debug)] +pub struct Fee { + pub asset_id: Hash, + pub cycle: u64, +} + +#[derive(Clone, Debug)] +pub enum TransactionAction { + Transfer { + receiver: Address, + asset_id: Hash, + amount: Balance, + }, + Approve { + spender: Address, + asset_id: Hash, + max: Balance, + }, + Deploy { + code: Vec, + contract_type: ContractType, + }, + Call { + contract: Address, + method: String, + args: Vec, + asset_id: Hash, + amount: Balance, + }, +} + +#[derive(Clone, Debug)] +pub struct SignedTransaction { + pub raw: RawTransaction, + pub tx_hash: Hash, + pub pubkey: Vec, + pub signature: Vec, +} + +#[derive(Clone, Debug)] +pub enum ContractType { + Asset, + Library, + App, +} diff --git a/protocol/traits/Cargo.toml b/protocol/traits/Cargo.toml deleted file mode 100644 index b9586b8cc..000000000 --- a/protocol/traits/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "protocol-traits" -version = "0.1.0" -authors = ["Cryptape Technologies "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/protocol/traits/src/lib.rs b/protocol/traits/src/lib.rs deleted file mode 100644 index 31e1bb209..000000000 --- a/protocol/traits/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/protocol/types/Cargo.toml b/protocol/types/Cargo.toml deleted file mode 100644 index f50355345..000000000 --- a/protocol/types/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "protocol-types" -version = "0.1.0" -authors = ["Cryptape Technologies "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/protocol/types/src/lib.rs b/protocol/types/src/lib.rs deleted file mode 100644 index 31e1bb209..000000000 --- a/protocol/types/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/src/main.rs b/src/main.rs index f4d029783..b13a46e27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,17 @@ #![feature(async_await)] -use runtime::net::UdpSocket; +use std::sync::Arc; -#[runtime::main] -async fn main() -> std::io::Result<()> { - let mut socket = UdpSocket::bind("127.0.0.1:8080")?; - let mut buf = vec![0u8; 1024]; +use core_storage::{adapter::rocks::RocksAdapter, ImplStorage}; +use protocol::traits::Storage; +use protocol::ProtocolError; - println!("Listening on {}", socket.local_addr()?); +#[runtime::main] +async fn main() -> Result<(), ProtocolError> { + let storage = ImplStorage::new(Arc::new(RocksAdapter::new( + "./devtools/data/storage".to_owned(), + )?)); + storage.insert_transactions(vec![]).await?; - loop { - let (recv, peer) = socket.recv_from(&mut buf).await?; - let sent = socket.send_to(&buf[..recv], &peer).await?; - println!("Sent {} out of {} bytes to {}", sent, recv, peer); - } + Ok(()) }