Skip to content

Commit

Permalink
Update README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
yejiayu committed Oct 31, 2019
1 parent 6350a3a commit 75eb0b1
Show file tree
Hide file tree
Showing 22 changed files with 546 additions and 69 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ Cargo.lock
# IDE
.idea/
.vscode/

# dev
devtools/data
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ authors = ["Cryptape Technologies <[email protected]>"]
edition = "2018"

[dependencies]
protocol = { path = "./protocol" }
core-storage = { path = "./core/storage" }

runtime = "0.3.0-alpha.6"

[workspace]
Expand All @@ -20,13 +23,10 @@ members = [
"core/api",
"core/bank",
"core/consensus",
"core/database",
"core/executor",
"core/mempool",
"core/network",
"core/storage",

"protocol/codec",
"protocol/traits",
"protocol/types",
"protocol",
]
6 changes: 6 additions & 0 deletions core/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
protocol = { path = "../../protocol" }

bytes = "0.4"
parking_lot = "0.9"
async-trait = "0.1"
rocksdb = "0.12"
1 change: 1 addition & 0 deletions core/storage/src/adapter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod rocks;
186 changes: 186 additions & 0 deletions core/storage/src/adapter/rocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use std::error::Error;
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use rocksdb::{ColumnFamily, Options, WriteBatch, DB};

use protocol::traits::{StorageAdapter, StorageCategory};
use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult};

#[derive(Debug)]
pub struct RocksAdapter {
db: Arc<DB>,
}

impl RocksAdapter {
pub fn new(path: String) -> ProtocolResult<Self> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);

let categories = [
map_category(StorageCategory::Epoch),
map_category(StorageCategory::Receipt),
map_category(StorageCategory::SignedTransaction),
];

let db = DB::open_cf(&opts, path, categories.iter()).map_err(RocksAdapterError::from)?;

Ok(RocksAdapter { db: Arc::new(db) })
}
}

#[async_trait]
impl StorageAdapter for RocksAdapter {
async fn get(&self, c: StorageCategory, key: Bytes) -> ProtocolResult<Option<Bytes>> {
let column = get_column(&self.db, c)?;
let v = self
.db
.get_cf(column, key)
.map_err(RocksAdapterError::from)?;

Ok(v.map(|v| Bytes::from(v.to_vec())))
}

async fn get_batch(
&self,
c: StorageCategory,
keys: Vec<Bytes>,
) -> ProtocolResult<Vec<Option<Bytes>>> {
let column = get_column(&self.db, c)?;

let mut values = Vec::with_capacity(keys.len());
for key in keys {
let v = self
.db
.get_cf(column, key)
.map_err(RocksAdapterError::from)?;

values.push(v.map(|v| Bytes::from(v.to_vec())));
}

Ok(values)
}

async fn insert(&self, c: StorageCategory, key: Bytes, value: Bytes) -> ProtocolResult<()> {
let column = get_column(&self.db, c)?;
self.db
.put_cf(column, key.to_vec(), value.to_vec())
.map_err(RocksAdapterError::from)?;
Ok(())
}

async fn insert_batch(
&self,
c: StorageCategory,
keys: Vec<Bytes>,
values: Vec<Bytes>,
) -> ProtocolResult<()> {
if keys.len() != values.len() {
return Err(RocksAdapterError::InsertParameter.into());
}

let column = get_column(&self.db, c)?;

let mut batch = WriteBatch::default();
for (key, value) in keys.into_iter().zip(values.into_iter()) {
batch
.put_cf(column, key, value)
.map_err(RocksAdapterError::from)?;
}

self.db.write(batch).map_err(RocksAdapterError::from)?;
Ok(())
}

async fn contains(&self, c: StorageCategory, key: Bytes) -> ProtocolResult<bool> {
let column = get_column(&self.db, c)?;
let v = self
.db
.get_cf(column, key)
.map_err(RocksAdapterError::from)?;

Ok(v.is_some())
}

async fn remove(&self, c: StorageCategory, key: Bytes) -> ProtocolResult<()> {
let column = get_column(&self.db, c)?;
self.db
.delete_cf(column, key)
.map_err(RocksAdapterError::from)?;
Ok(())
}

async fn remove_batch(&self, c: StorageCategory, keys: Vec<Bytes>) -> ProtocolResult<()> {
let column = get_column(&self.db, c)?;

let mut batch = WriteBatch::default();
for key in keys {
batch
.delete_cf(column, key)
.map_err(RocksAdapterError::from)?;
}

self.db.write(batch).map_err(RocksAdapterError::from)?;
Ok(())
}
}

const C_EPOCHS: &str = "c1";
const C_SIGNED_TRANSACTIONS: &str = "c2";
const C_RECEIPTS: &str = "c3";

fn map_category(c: StorageCategory) -> &'static str {
match c {
StorageCategory::Epoch => C_EPOCHS,
StorageCategory::Receipt => C_RECEIPTS,
StorageCategory::SignedTransaction => C_SIGNED_TRANSACTIONS,
}
}

fn get_column(db: &DB, c: StorageCategory) -> Result<ColumnFamily, RocksAdapterError> {
let column = db
.cf_handle(map_category(c))
.ok_or_else(|| RocksAdapterError::CategoryNotFound { c })?;
Ok(column)
}

#[derive(Debug)]
pub enum RocksAdapterError {
CategoryNotFound { c: StorageCategory },
RocksDB { error: rocksdb::Error },
InsertParameter,
}

impl Error for RocksAdapterError {}

impl fmt::Display for RocksAdapterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let printable = match self {
RocksAdapterError::CategoryNotFound { c } => format!("category {:?} not found", c),
RocksAdapterError::RocksDB { error } => format!("rocksdb {:?}", error),
RocksAdapterError::InsertParameter => "parameters do not match".to_owned(),
};
write!(f, "{}", printable)
}
}

impl From<RocksAdapterError> for ProtocolError {
fn from(err: RocksAdapterError) -> ProtocolError {
ProtocolError::new(ProtocolErrorKind::Storage, Box::new(err))
}
}

impl From<rocksdb::Error> for RocksAdapterError {
fn from(error: rocksdb::Error) -> Self {
RocksAdapterError::RocksDB { error }
}
}

impl From<StorageCategory> for RocksAdapterError {
fn from(c: StorageCategory) -> Self {
RocksAdapterError::CategoryNotFound { c }
}
}
50 changes: 45 additions & 5 deletions core/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,47 @@
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
// Remove this clippy bug with async await is resolved.
// ISSUE: https://github.com/rust-lang/rust-clippy/issues/3988
#![allow(clippy::needless_lifetimes)]
#![feature(async_await)]

pub mod adapter;

use std::sync::Arc;

use async_trait::async_trait;

use protocol::traits::{Storage, StorageAdapter, StorageCategory};
use protocol::types::{Hash, SignedTransaction};
use protocol::ProtocolResult;

#[derive(Debug)]
pub struct ImplStorage<Adapter> {
adapter: Arc<Adapter>,
}

impl<Adapter: StorageAdapter> ImplStorage<Adapter> {
pub fn new(adapter: Arc<Adapter>) -> Self {
Self { adapter }
}
}

#[async_trait]
impl<Adapter: StorageAdapter> Storage<Adapter> for ImplStorage<Adapter> {
async fn insert_transactions(&self, _signed_txs: Vec<SignedTransaction>) -> ProtocolResult<()> {
self.adapter
.insert_batch(StorageCategory::SignedTransaction, vec![], vec![])
.await?;
Ok(())
}

async fn get_transaction_by_hash(
&self,
_tx_hash: Hash,
) -> ProtocolResult<Option<SignedTransaction>> {
unimplemented!();
// let adapter = Arc::clone(&self.adapter);
//
// async move {
// adapter.get(tx_hash.as_bytes()).await.unwrap();
// }
}
}
6 changes: 5 additions & 1 deletion core/database/Cargo.toml → protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
[package]
name = "core-database"
name = "protocol"
version = "0.1.0"
authors = ["Cryptape Technologies <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1"
bytes = "0.4"
uint = "0.8"
hex = "0.3"
9 changes: 0 additions & 9 deletions protocol/codec/Cargo.toml

This file was deleted.

7 changes: 0 additions & 7 deletions protocol/codec/src/lib.rs

This file was deleted.

File renamed without changes.
54 changes: 54 additions & 0 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#[macro_use]
extern crate uint;

pub mod codec;
pub mod traits;
pub mod types;

use std::error::Error;
use std::fmt;

#[derive(Debug, Clone)]
pub enum ProtocolErrorKind {
// traits
API,
Bank,
Consensus,
Executor,
Mempool,
Network,
Storage,

// codec
Codec,

// types
Types,
}

// refer to https://github.com/rust-lang/rust/blob/a17951c4f80eb5208030f91fdb4ae93919fa6b12/src/libstd/io/error.rs#L73
#[derive(Debug)]
pub struct ProtocolError {
kind: ProtocolErrorKind,
error: Box<dyn Error + Send>,
}

impl ProtocolError {
pub fn new(kind: ProtocolErrorKind, error: Box<dyn Error + Send>) -> Self {
Self { kind, error }
}
}

impl Error for ProtocolError {}

impl fmt::Display for ProtocolError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"[ProtocolError] Kind: {:?} Error: {:?}",
self.kind, self.error
)
}
}

pub type ProtocolResult<T> = Result<T, ProtocolError>;
15 changes: 15 additions & 0 deletions protocol/src/traits/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use async_trait::async_trait;

use crate::ProtocolResult;

#[async_trait]
pub trait Executor<Adapter: ExecutorAdapter>: Send + Sync {
type Adapter: ExecutorAdapter;

fn exec(&self) -> ProtocolResult<()>;
}

#[async_trait]
pub trait ExecutorAdapter: Send + Sync {
fn get_epoch_header(&self) -> ProtocolResult<()>;
}
5 changes: 5 additions & 0 deletions protocol/src/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod executor;
mod storage;

pub use executor::{Executor, ExecutorAdapter};
pub use storage::{Storage, StorageAdapter, StorageCategory};
Loading

0 comments on commit 75eb0b1

Please sign in to comment.