Skip to content

Commit

Permalink
refactor: Taste runtime crate. (nervosnetwork#268)
Browse files Browse the repository at this point in the history
* chore(core-network): switch to runtime crate

* chore(main): wrap main in async runtime

* fix(main): clippy false-positives on needless lifetime

* chore(core-pubsub): replace std::thread::spawn with runtime spawn

* doc(core-pubsub): use runtime::spawn

* test(core-pubsub): use runtime::spawn

* chore(comp-tx-pool): use runtime::spawn

* fix(comp-tx-pool): some tests timeout

Switch from native runtime to tokio runtime

* chore(core-consensus): use runtime::spawn

* test(comp-tx-pool): speed up broadcast_txs test

* test(core-network): use runtime::test, remove block_on

* style(main): consistent struct init

* fix(comp-tx-pool): block on cache_broadcast_receiver

Use await instead

* style(comp-tx-pool): use ```for_each``` instead of loop in cache_broadcast_txs
  • Loading branch information
zeroqn authored and yejiayu committed Jun 5, 2019
1 parent 4fc5437 commit 4479c3e
Show file tree
Hide file tree
Showing 23 changed files with 702 additions and 475 deletions.
227 changes: 222 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ core-network = { path = "core/network" }

rayon = "1.0"
hex = "0.3"
tokio = "0.1"
clap = "2.32"
log = "0.4"
num_cpus = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
futures-preview = { version = "0.3.0-alpha.16", features = [ "compat" ] }
futures-preview = "0.3.0-alpha.16"
runtime = "0.3.0-alpha.4"
runtime-tokio = "0.3.0-alpha.4"

[workspace]
members = [
Expand Down
2 changes: 2 additions & 0 deletions components/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ parking_lot = "0.8"
log = "0.4"
futures-preview = "0.3.0-alpha.16"
rayon = "1.0"
runtime = "0.3.0-alpha.4"

[dev-dependencies]
components-database = { path = "../database" }
hex = "0.3"
uuid = { version = "0.7", features = ["serde", "v4"] }
chashmap = "2.2"
runtime-tokio = "0.3.0-alpha.4"
131 changes: 78 additions & 53 deletions components/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::string::ToString;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use futures::{future::ready, prelude::StreamExt};

use common_channel::{unbounded, Receiver, Sender};
use core_context::{Context, ORIGIN};
use core_crypto::Crypto;
Expand Down Expand Up @@ -52,7 +54,7 @@ where
let (cache_broadcast_sender, cache_broadcast_receiver) = unbounded();
let network2 = network.clone();

std::thread::spawn(move || cache_broadcast_txs(network2, cache_broadcast_receiver));
runtime::spawn(cache_broadcast_txs(network2, cache_broadcast_receiver));

HashTransactionPool {
pool_size,
Expand Down Expand Up @@ -290,24 +292,24 @@ fn internal_error(e: impl ToString) -> TransactionPoolError {

// TODO: If the number of transactions does not satisfy "CACHE_BROOADCAST_LEN",
// does it need to set up a timed broadcast?
fn cache_broadcast_txs<N: Network>(network: N, receiver: Receiver<SignedTransaction>) {
async fn cache_broadcast_txs<N: Network>(network: N, receiver: Receiver<SignedTransaction>) {
let mut buffer_txs: Vec<SignedTransaction> = Vec::with_capacity(CACHE_BROOADCAST_LEN);

loop {
let push_may_broadcast = move |tx: SignedTransaction| {
buffer_txs.push(tx);

if buffer_txs.len() >= CACHE_BROOADCAST_LEN {
let mut temp = Vec::with_capacity(CACHE_BROOADCAST_LEN);
mem::swap(&mut buffer_txs, &mut temp);

network.broadcast_batch(temp);
}

match receiver.recv() {
Ok(tx) => {
buffer_txs.push(tx);
}
Err(e) => log::error!("cache broadcast receiver {:?}", e),
}
}
ready(())
};

receiver.for_each(push_may_broadcast).await;
log::error!("component: [tx_pool]: cache broadcast channel disconnected");
}

#[cfg(test)]
Expand Down Expand Up @@ -358,8 +360,8 @@ mod tests {
}
}

#[test]
fn test_insert_transaction() {
#[runtime::test]
async fn test_insert_transaction() {
let ctx = Context::new();
let pool_size = 1000;
let until_block_limit = 100;
Expand All @@ -377,7 +379,7 @@ mod tests {

// test normal
let untx = mock_transaction(100, height + until_block_limit, "test_normal".to_owned());
let signed_tx = block_on(tx_pool.insert(ctx.clone(), untx.clone())).unwrap();
let signed_tx = tx_pool.insert(ctx.clone(), untx.clone()).await.unwrap();
assert_eq!(
signed_tx.hash,
Into::<CitaUnverifiedTransaction>::into(untx)
Expand All @@ -388,7 +390,7 @@ mod tests {

// test lt valid_until_block
let untx = mock_transaction(100, height, "test_lt_quota_limit".to_owned());
let result = block_on(tx_pool.insert(ctx.clone(), untx));
let result = tx_pool.insert(ctx.clone(), untx).await;
assert_eq!(result, Err(TransactionPoolError::InvalidUntilBlock));

// test gt valid_until_block
Expand All @@ -397,7 +399,7 @@ mod tests {
height + until_block_limit * 2,
"test_gt_valid_until_block".to_owned(),
);
let result = block_on(tx_pool.insert(ctx.clone(), untx));
let result = tx_pool.insert(ctx.clone(), untx).await;
assert_eq!(result, Err(TransactionPoolError::InvalidUntilBlock));

// test gt quota limit
Expand All @@ -406,19 +408,19 @@ mod tests {
height + until_block_limit,
"test_gt_quota_limit".to_owned(),
);
let result = block_on(tx_pool.insert(ctx.clone(), untx));
let result = tx_pool.insert(ctx.clone(), untx).await;
assert_eq!(result, Err(TransactionPoolError::QuotaNotEnough));

// test cache dup
let untx = mock_transaction(100, height + until_block_limit, "test_dup".to_owned());
let untx2 = untx.clone();
block_on(tx_pool.insert(ctx.clone(), untx)).unwrap();
let result = block_on(tx_pool.insert(ctx.clone(), untx2));
tx_pool.insert(ctx.clone(), untx).await.unwrap();
let result = tx_pool.insert(ctx.clone(), untx2).await;
assert_eq!(result, Err(TransactionPoolError::Dup));
}

#[test]
fn test_histories_dup() {
#[runtime::test]
async fn test_histories_dup() {
let ctx = Context::new();
let pool_size = 1000;
let until_block_limit = 100;
Expand All @@ -436,9 +438,12 @@ mod tests {
let mut block = Block::default();
block.header.height = height;

block_on(storage.insert_transactions(ctx.clone(), vec![signed_tx.clone()])).unwrap();
storage
.insert_transactions(ctx.clone(), vec![signed_tx.clone()])
.await
.unwrap();

block_on(storage.insert_block(ctx.clone(), block)).unwrap();
storage.insert_block(ctx.clone(), block).await.unwrap();

let tx_pool = new_test_pool(
ctx.clone(),
Expand All @@ -449,12 +454,13 @@ mod tests {
height,
);

let result = block_on(tx_pool.insert(ctx.clone(), signed_tx.untx));
let result = tx_pool.insert(ctx.clone(), signed_tx.untx).await;
assert_eq!(result, Err(TransactionPoolError::Dup));
}

#[test]
fn test_pool_size() {
// NOTE: only tokio can pass this test
#[runtime::test(runtime_tokio::Tokio)]
async fn test_pool_size() {
let ctx = Context::new();
let pool_size = 1;
let until_block_limit = 100;
Expand All @@ -471,7 +477,7 @@ mod tests {
);

let untx = mock_transaction(100, height + until_block_limit, "test1".to_owned());
let signed_tx = block_on(tx_pool.insert(ctx.clone(), untx.clone())).unwrap();
let signed_tx = tx_pool.insert(ctx.clone(), untx.clone()).await.unwrap();
assert_eq!(
signed_tx.hash,
Into::<CitaUnverifiedTransaction>::into(untx)
Expand All @@ -481,12 +487,13 @@ mod tests {
);

let untx = mock_transaction(100, height + until_block_limit, "test2".to_owned());
let result = block_on(tx_pool.insert(ctx.clone(), untx));
let result = tx_pool.insert(ctx.clone(), untx).await;
assert_eq!(result, Err(TransactionPoolError::ReachLimit));
}

#[test]
fn test_package_transaction_count() {
// NOTE: only tokio can pass this test
#[runtime::test(runtime_tokio::Tokio)]
async fn test_package_transaction_count() {
let ctx = Context::new();
let pool_size = 100;
let until_block_limit = 100;
Expand All @@ -510,11 +517,13 @@ mod tests {
.unwrap()
.hash();
tx_hashes.push(tx_hash.clone());
block_on(tx_pool.insert(ctx.clone(), untx.clone())).unwrap();
tx_pool.insert(ctx.clone(), untx.clone()).await.unwrap();
}

let pachage_tx_hashes =
block_on(tx_pool.package(ctx, tx_hashes.len() as u64, quota_limit)).unwrap();
let pachage_tx_hashes = tx_pool
.package(ctx, tx_hashes.len() as u64, quota_limit)
.await
.unwrap();
assert_eq!(tx_hashes.len(), pachage_tx_hashes.len());
assert_eq!(
tx_hashes
Expand All @@ -524,8 +533,8 @@ mod tests {
);
}

#[test]
fn test_flush() {
#[runtime::test]
async fn test_flush() {
let ctx = Context::new();
let pool_size = 1000;
let until_block_limit = 100;
Expand Down Expand Up @@ -567,21 +576,28 @@ mod tests {
.iter()
.map(|stx| stx.hash.clone())
.collect::<Vec<Hash>>();
let stxs = block_on(tx_pool.get_batch(ctx.clone(), test_hashes.as_slice())).unwrap();
let stxs = tx_pool
.get_batch(ctx.clone(), test_hashes.as_slice())
.await
.unwrap();
assert_eq!(stxs.len(), test_hashes.len());
assert_eq!(tx_pool.callback_cache.len(), test_hashes.len());

let test_hashes = pool_stxs
.iter()
.map(|stx| stx.hash.clone())
.collect::<Vec<Hash>>();
block_on(tx_pool.flush(ctx.clone(), test_hashes.as_slice())).unwrap();
tx_pool
.flush(ctx.clone(), test_hashes.as_slice())
.await
.unwrap();
assert_eq!(tx_pool.callback_cache.len(), 0);
assert_eq!(tx_pool.tx_cache.len(), 0);
}

#[test]
fn test_package_transaction_quota_limit() {
// NOTE: only tokio can pass this test
#[runtime::test(runtime_tokio::Tokio)]
async fn test_package_transaction_quota_limit() {
let ctx = Context::new();
let pool_size = 100;
let until_block_limit = 100;
Expand All @@ -605,16 +621,18 @@ mod tests {
.unwrap()
.hash();
tx_hashes.push(tx_hash.clone());
block_on(tx_pool.insert(ctx.clone(), untx)).unwrap();
tx_pool.insert(ctx.clone(), untx).await.unwrap();
}

let pachage_tx_hashes =
block_on(tx_pool.package(ctx, tx_hashes.len() as u64, quota_limit)).unwrap();
let pachage_tx_hashes = tx_pool
.package(ctx, tx_hashes.len() as u64, quota_limit)
.await
.unwrap();
assert_eq!(8, pachage_tx_hashes.len());
}

#[test]
fn test_ensure_partial_unknown_hashes() {
#[runtime::test]
async fn test_ensure_partial_unknown_hashes() {
let ctx = Context::new();
let pool_size = 1000;
let until_block_limit = 100;
Expand Down Expand Up @@ -642,10 +660,13 @@ mod tests {
untxs.push(untx);
}

block_on(tx_pool.insert(ctx.clone(), untxs[0].clone())).unwrap();
tx_pool.insert(ctx.clone(), untxs[0].clone()).await.unwrap();
assert_eq!(tx_pool.tx_cache.len(), 1);

block_on(tx_pool.ensure(ctx.clone(), tx_hashes.as_slice())).unwrap();
tx_pool
.ensure(ctx.clone(), tx_hashes.as_slice())
.await
.unwrap();
let callback_cache = tx_pool.callback_cache;

dbg!(callback_cache.len());
Expand All @@ -656,8 +677,8 @@ mod tests {
}
}

#[test]
fn test_ensure_full_known_hashes() {
#[runtime::test]
async fn test_ensure_full_known_hashes() {
let ctx = Context::new();
let pool_size = 1000;
let until_block_limit = 100;
Expand All @@ -683,16 +704,20 @@ mod tests {
.unwrap()
.hash(),
);
block_on(tx_pool.insert(ctx.clone(), untx)).unwrap();
tx_pool.insert(ctx.clone(), untx).await.unwrap();
}
assert_eq!(tx_pool.tx_cache.len(), 5);

block_on(tx_pool.ensure(ctx.clone(), tx_hashes.as_slice())).unwrap();
tx_pool
.ensure(ctx.clone(), tx_hashes.as_slice())
.await
.unwrap();
assert_eq!(tx_pool.callback_cache.len(), 0);
}

#[test]
fn test_broadcast_txs() {
// NOTE: use tokio runtime to speed up test
#[runtime::test(runtime_tokio::Tokio)]
async fn test_broadcast_txs() {
let ctx = Context::new().with_value(ORIGIN, TransactionOrigin::Jsonrpc);
let pool_size = 1000;
let until_block_limit = 100;
Expand All @@ -718,9 +743,9 @@ mod tests {
let untx = mock_transaction(100, height + until_block_limit, format!("test{}", i));

if i == CACHE_BROOADCAST_LEN {
block_on(tx_pool.insert(Context::new(), untx)).unwrap();
tx_pool.insert(Context::new(), untx).await.unwrap();
} else {
block_on(tx_pool.insert(ctx.clone(), untx)).unwrap();
tx_pool.insert(ctx.clone(), untx).await.unwrap();
}
}

Expand Down
Loading

0 comments on commit 4479c3e

Please sign in to comment.