Skip to content

Commit

Permalink
feat(mempool): Implement MemPool interfaces (#8)
Browse files Browse the repository at this point in the history
* complete TxCache primitive functions and benchamarks, use concurrent_count to deal with concurrent insertion with package

* define HashMemPool

* implement MemPool interface

* fix interfaces of MemPoolAdapter

* cargo clippy

* remove RwLock

* use self-define Map instead of CHashMap which may panic in high load

* add Enum Stage

* add annotations and remove cycle_limit param of package

* simplify gen_tx

* add Cargo.lock

* update nightly version to 2019-08-26 and use crate num-bigint instead of uint

* optimize code style

* change current_h to current_epoch_id and add comment on sync_propose_txs

* change all panic to error and throw them up
  • Loading branch information
rev-chaos authored and yejiayu committed Oct 31, 2019
1 parent 81ff4f3 commit 0b18250
Show file tree
Hide file tree
Showing 14 changed files with 960 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
protocol = { path = "./protocol" }
core-storage = { path = "./core/storage" }

runtime = "0.3.0-alpha.6"
runtime = "0.3.0-alpha.7"

[workspace]
members = [
Expand Down
17 changes: 17 additions & 0 deletions core/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,20 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
protocol = { path = "../../protocol" }

futures-preview = "0.3.0-alpha.18"
runtime = "0.3.0-alpha.7"
crossbeam-queue = "0.1"
derive_more = "0.15"
async-trait = "0.1"
parking_lot = "0.8"
bytes = "0.4"
rayon = "1.0"
hex = "0.3"

[dev-dependencies]
num-traits = "0.2"
chashmap = "2.2"
rand = "0.7.0"

37 changes: 37 additions & 0 deletions core/mempool/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::error::Error;

use derive_more::{Display, From};

use protocol::types::Hash;
use protocol::{ProtocolError, ProtocolErrorKind};

#[derive(Debug, Display, From)]
pub enum MemPoolError {
#[display(fmt = "Tx: {:?} insert failed", tx_hash)]
Insert { tx_hash: Hash },
#[display(fmt = "Mempool reach limit: {}", pool_size)]
ReachLimit { pool_size: usize },
#[display(fmt = "Tx: {:?} exists in pool", tx_hash)]
Dup { tx_hash: Hash },
#[display(fmt = "Pull {} tx_hashes, return {} signed_txs", require, response)]
EnsureBreak { require: usize, response: usize },
#[display(
fmt = "Return mismatch number of full transaction, require: {}, response: {}. This should not happen!",
require,
response
)]
MisMatch { require: usize, response: usize },
#[display(
fmt = "Transaction insert into candidate queue with len: {} failed which should not happen!",
len
)]
InsertCandidate { len: usize },
}

impl Error for MemPoolError {}

impl From<MemPoolError> for ProtocolError {
fn from(error: MemPoolError) -> ProtocolError {
ProtocolError::new(ProtocolErrorKind::Mempool, Box::new(error))
}
}
194 changes: 189 additions & 5 deletions core/mempool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,191 @@
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
#![feature(test)]

mod error;
mod map;
mod tx_cache;

use std::sync::atomic::{AtomicU64, Ordering};

use async_trait::async_trait;

use protocol::traits::{Context, MemPool, MemPoolAdapter, MixedTxHashes};
use protocol::types::{Hash, SignedTransaction};
use protocol::ProtocolResult;

use crate::error::MemPoolError;
use crate::map::Map;
use crate::tx_cache::TxCache;

/// Memory pool for caching transactions.
struct HashMemPool<Adapter: MemPoolAdapter> {
/// Pool size limit.
pool_size: usize,
/// A system param limits the life time of an off-chain transaction.
timeout_gap: u64,
/// The max cycles accumulated in an `Epoch`.
cycle_limit: u64,
/// A structure for caching new transactions and responsible transactions of
/// propose-sync.
tx_cache: TxCache,
/// A structure for caching fresh transactions in order transaction hashes.
callback_cache: Map<SignedTransaction>,
/// Supply necessary functions from outer modules.
adapter: Adapter,
/// Current epoch_id.
current_epoch_id: AtomicU64,
}

impl<Adapter> HashMemPool<Adapter>
where
Adapter: MemPoolAdapter,
{
#[allow(dead_code)]
pub fn new(
pool_size: usize,
timeout_gap: u64,
cycle_limit: u64,
current_epoch_id: u64,
adapter: Adapter,
) -> Self {
HashMemPool {
pool_size,
timeout_gap,
cycle_limit,
tx_cache: TxCache::new(pool_size),
callback_cache: Map::new(pool_size),
adapter,
current_epoch_id: AtomicU64::new(current_epoch_id),
}
}
}

#[async_trait]
impl<Adapter> MemPool<Adapter> for HashMemPool<Adapter>
where
Adapter: MemPoolAdapter,
{
async fn insert(&self, ctx: Context, tx: SignedTransaction) -> ProtocolResult<()> {
let tx_hash = &tx.tx_hash;
// 1. check size
if self.tx_cache.len() >= self.pool_size {
return Err(MemPoolError::ReachLimit {
pool_size: self.pool_size,
}
.into());
}
// 2. check pool exist
if self.tx_cache.contain(tx_hash) {
return Err(MemPoolError::Dup {
tx_hash: tx_hash.clone(),
}
.into());
}

// 3. check signature
self.adapter
.check_signature(ctx.clone(), tx.clone())
.await?;

// 4. check transaction
self.adapter
.check_transaction(ctx.clone(), tx.clone())
.await?;

// 5. check storage exist
self.adapter
.check_storage_exist(ctx.clone(), tx_hash.clone())
.await?;

// 6. do insert
self.tx_cache.insert_new_tx(tx.clone())?;

// 7. network broadcast
self.adapter.broadcast_tx(ctx.clone(), tx).await?;

Ok(())
}

async fn package(&self, _ctx: Context) -> ProtocolResult<MixedTxHashes> {
let current_epoch_id = self.current_epoch_id.load(Ordering::SeqCst);
self.tx_cache.package(
self.cycle_limit,
current_epoch_id,
current_epoch_id + self.timeout_gap,
)
}

async fn flush(&self, _ctx: Context, tx_hashes: Vec<Hash>) -> ProtocolResult<()> {
self.tx_cache.flush(&tx_hashes);
self.callback_cache.clear();
Ok(())
}

async fn get_full_txs(
&self,
_ctx: Context,
tx_hashes: Vec<Hash>,
) -> ProtocolResult<Vec<SignedTransaction>> {
let len = tx_hashes.len();
let mut full_txs = Vec::with_capacity(len);

for tx_hash in tx_hashes {
if let Some(tx) = self.tx_cache.get(&tx_hash) {
full_txs.push(tx);
} else if let Some(tx) = self.callback_cache.get(&tx_hash) {
full_txs.push(tx);
}
}

if full_txs.len() != len {
Err(MemPoolError::MisMatch {
require: len,
response: full_txs.len(),
}
.into())
} else {
Ok(full_txs)
}
}

async fn ensure_order_txs(
&self,
ctx: Context,
order_tx_hashes: Vec<Hash>,
) -> ProtocolResult<()> {
let unknown_hashes = self.tx_cache.show_unknown(order_tx_hashes);
if !unknown_hashes.is_empty() {
let unknown_len = unknown_hashes.len();
let txs = self.adapter.pull_txs(ctx.clone(), unknown_hashes).await?;
// Make sure response signed_txs is the same size of request hashes.
if txs.len() != unknown_len {
return Err(MemPoolError::EnsureBreak {
require: unknown_len,
response: txs.len(),
}
.into());
}
txs.into_iter().for_each(|tx| {
self.callback_cache.insert(tx.tx_hash.clone(), tx);
});
}

Ok(())
}

async fn sync_propose_txs(
&self,
ctx: Context,
propose_tx_hashes: Vec<Hash>,
) -> ProtocolResult<()> {
let unknown_hashes = self.tx_cache.show_unknown(propose_tx_hashes);
if !unknown_hashes.is_empty() {
let txs = self.adapter.pull_txs(ctx.clone(), unknown_hashes).await?;
txs.into_iter().for_each(|tx| {
// Should not handle error here, it is normal that transactions response here
// are exist in pool.
let _ = self.tx_cache.insert_propose_tx(tx);
});
}
Ok(())
}
}
Loading

0 comments on commit 0b18250

Please sign in to comment.