Skip to content

Commit

Permalink
feat(mempool): Develop mempool's tests and benches (nervosnetwork#9)
Browse files Browse the repository at this point in the history
* add adapter, crypto, env, utils

* add env, utils

* refactor test and bench

* add test_mempool_package test_mempool_flush

* adjust code style

* add ophelia

* fix ophelia unstable feature

* combine all test files into only one file

* add some new tests and refer ophelia to common-crypto

* add test_sync_propose_txs
  • Loading branch information
rev-chaos authored and yejiayu committed Oct 31, 2019
1 parent 0b18250 commit 2af246a
Show file tree
Hide file tree
Showing 8 changed files with 646 additions and 88 deletions.
6 changes: 6 additions & 0 deletions common/crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ophelia-secp256k1 = { git = "https://github.com/zeroqn/ophelia.git" }
ophelia = { git = "https://github.com/zeroqn/ophelia.git" }

[features]
default = ["generate"]
generate = ["ophelia-secp256k1/generate", "ophelia/generate"]
11 changes: 4 additions & 7 deletions common/crypto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}
pub use ophelia::{Crypto, PrivateKey, PublicKey, Signature};
pub use ophelia_secp256k1::{
Secp256k1, Secp256k1PrivateKey, Secp256k1PublicKey, Secp256k1Signature,
};
8 changes: 6 additions & 2 deletions core/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ edition = "2018"

[dependencies]
protocol = { path = "../../protocol" }
common-crypto = { path = "../../common/crypto"}

futures-preview = "0.3.0-alpha.18"
runtime-tokio = "0.3.0-alpha.6"
runtime = "0.3.0-alpha.7"
crossbeam-queue = "0.1"
derive_more = "0.15"
async-trait = "0.1"
parking_lot = "0.8"
num-traits = "0.2"
bytes = "0.4"
rayon = "1.0"
rand = "0.6"
hex = "0.3"

[dev-dependencies]
num-traits = "0.2"
chashmap = "2.2"
rand = "0.7.0"



37 changes: 0 additions & 37 deletions core/mempool/src/error.rs

This file was deleted.

74 changes: 47 additions & 27 deletions core/mempool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
#![feature(test)]

mod error;
mod map;
mod test;
mod tx_cache;

use std::error::Error;
use std::sync::atomic::{AtomicU64, Ordering};

use async_trait::async_trait;
use derive_more::{Display, From};

use protocol::traits::{Context, MemPool, MemPoolAdapter, MixedTxHashes};
use protocol::types::{Hash, SignedTransaction};
use protocol::ProtocolResult;
use protocol::{ProtocolError, ProtocolErrorKind, ProtocolResult};

use crate::error::MemPoolError;
use crate::map::Map;
use crate::tx_cache::TxCache;

/// Memory pool for caching transactions.
struct HashMemPool<Adapter: MemPoolAdapter> {
pub struct HashMemPool<Adapter: MemPoolAdapter> {
/// Pool size limit.
pool_size: usize,
/// A system param limits the life time of an off-chain transaction.
Expand Down Expand Up @@ -57,6 +58,18 @@ where
current_epoch_id: AtomicU64::new(current_epoch_id),
}
}

pub fn get_tx_cache(&self) -> &TxCache {
&self.tx_cache
}

pub fn get_callback_cache(&self) -> &Map<SignedTransaction> {
&self.callback_cache
}

pub fn get_adapter(&self) -> &Adapter {
&self.adapter
}
}

#[async_trait]
Expand All @@ -66,40 +79,19 @@ where
{
async fn insert(&self, ctx: Context, tx: SignedTransaction) -> ProtocolResult<()> {
let tx_hash = &tx.tx_hash;
// 1. check size
if self.tx_cache.len() >= self.pool_size {
return Err(MemPoolError::ReachLimit {
pool_size: self.pool_size,
}
.into());
}
// 2. check pool exist
if self.tx_cache.contain(tx_hash) {
return Err(MemPoolError::Dup {
tx_hash: tx_hash.clone(),
}
.into());
}

// 3. check signature
self.tx_cache.check_reach_limit(self.pool_size)?;
self.tx_cache.check_exist(tx_hash)?;
self.adapter
.check_signature(ctx.clone(), tx.clone())
.await?;

// 4. check transaction
self.adapter
.check_transaction(ctx.clone(), tx.clone())
.await?;

// 5. check storage exist
self.adapter
.check_storage_exist(ctx.clone(), tx_hash.clone())
.await?;

// 6. do insert
self.tx_cache.insert_new_tx(tx.clone())?;

// 7. network broadcast
self.adapter.broadcast_tx(ctx.clone(), tx).await?;

Ok(())
Expand Down Expand Up @@ -189,3 +181,31 @@ where
Ok(())
}
}

#[derive(Debug, Display, From)]
pub enum MemPoolError {
#[display(fmt = "Tx: {:?} inserts failed", tx_hash)]
Insert { tx_hash: Hash },
#[display(fmt = "Mempool reaches limit: {}", pool_size)]
ReachLimit { pool_size: usize },
#[display(fmt = "Tx: {:?} exists in pool", tx_hash)]
Dup { tx_hash: Hash },
#[display(fmt = "Pull txs, require: {}, response: {}", require, response)]
EnsureBreak { require: usize, response: usize },
#[display(fmt = "Fetch full txs, require: {}, response: {}", require, response)]
MisMatch { require: usize, response: usize },
#[display(fmt = "Tx inserts candidate_queue failed, len: {}", len)]
InsertCandidate { len: usize },
#[display(fmt = "Tx: {:?} check_sig failed", tx_hash)]
CheckSig { tx_hash: Hash },
#[display(fmt = "Check_hash failed, expect: {:?}, get: {:?}", expect, actual)]
CheckHash { expect: Hash, actual: Hash },
}

impl Error for MemPoolError {}

impl From<MemPoolError> for ProtocolError {
fn from(error: MemPoolError) -> ProtocolError {
ProtocolError::new(ProtocolErrorKind::Mempool, Box::new(error))
}
}
16 changes: 8 additions & 8 deletions core/mempool/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ mod tests {

use crate::map::Map;

const GEN_TX_SIZE: usize = 100_000;
const GEN_TX_SIZE: usize = 1000;

#[bench]
fn bench_insert_sharding(b: &mut Bencher) {
let txs = gen_txs(GEN_TX_SIZE);
fn bench_map_insert(b: &mut Bencher) {
let txs = mock_txs(GEN_TX_SIZE);

b.iter(move || {
let cache = Map::new(GEN_TX_SIZE);
Expand All @@ -158,8 +158,8 @@ mod tests {
}

#[bench]
fn bench_insert_std(b: &mut Bencher) {
let txs = gen_txs(GEN_TX_SIZE);
fn bench_std_map_insert(b: &mut Bencher) {
let txs = mock_txs(GEN_TX_SIZE);

b.iter(move || {
let cache = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -170,8 +170,8 @@ mod tests {
}

#[bench]
fn bench_insert_chashmap(b: &mut Bencher) {
let txs = gen_txs(GEN_TX_SIZE);
fn bench_chashmap_insert(b: &mut Bencher) {
let txs = mock_txs(GEN_TX_SIZE);

b.iter(move || {
let cache = CHashMap::new();
Expand All @@ -181,7 +181,7 @@ mod tests {
});
}

fn gen_txs(size: usize) -> Vec<(Hash, Hash)> {
fn mock_txs(size: usize) -> Vec<(Hash, Hash)> {
let mut txs = Vec::with_capacity(size);
for _ in 0..size {
let tx: Vec<u8> = (0..10).map(|_| random::<u8>()).collect();
Expand Down
Loading

0 comments on commit 2af246a

Please sign in to comment.