Skip to content

Commit

Permalink
feat(mempool): implement cached batch txs broadcast (nervosnetwork#20)
Browse files Browse the repository at this point in the history
* feat(mempool): implement cached batch txs broadcast

* feat(mempool): split interval broadcaster into standalone task future

Remove lock on whole transaction cache

* fix(mempool): clippy warnings

* chore(mempool): split interval timer into standalone task

* fix(mempool): error on empty receiver channel in default adapter

It's ok when err_rx channel is empty

* fix(mempool): default adapter broadcast on None timer signal

Should check whether there is an indeed interval signal

* fix(mempool): panic in default adapter select macro

Add complete branch

* test(mempool): add interval broadcast unit tests

* chore(config): add mempool interval broadcast entries
  • Loading branch information
zeroqn authored and yejiayu committed Oct 31, 2019
1 parent 8b5e8ce commit 02c5d2e
Show file tree
Hide file tree
Showing 9 changed files with 434 additions and 18 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ protocol = { path = "../../protocol" }
common-crypto = { path = "../../common/crypto" }
core-network = { path = "../network" }

futures-preview = "0.3.0-alpha.19"
futures-preview = { version = "0.3.0-alpha.19", features = [ "async-await" ] }
runtime-tokio = "0.3.0-alpha.6"
runtime = "0.3.0-alpha.7"
crossbeam-queue = "0.1"
Expand All @@ -25,6 +25,8 @@ rand = "0.6"
hex = "0.3"
serde_derive = "1.0"
serde = "1.0"
futures-timer = "1.0"
log = "0.4"

[dev-dependencies]
chashmap = "2.2"
22 changes: 19 additions & 3 deletions core/mempool/src/adapter/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use futures::future::try_join_all;
use protocol::{
traits::{Context, MemPool, MessageHandler, Priority, Rpc},
types::{Hash, SignedTransaction},
Expand All @@ -16,8 +17,8 @@ pub const END_RESP_PULL_TXS: &str = "/rpc_resp/mempool/pull_txs";

#[derive(Debug, Serialize, Deserialize)]
pub struct MsgNewTxs {
#[serde(with = "core_network::serde")]
pub stx: SignedTransaction,
#[serde(with = "core_network::serde_multi")]
pub batch_stxs: Vec<SignedTransaction>,
}

pub struct NewTxsHandler<M> {
Expand All @@ -43,7 +44,22 @@ where
async fn process(&self, ctx: Context, msg: Self::Message) -> ProtocolResult<()> {
let ctx = ctx.mark_network_origin_new_txs();

self.mem_pool.insert(ctx, msg.stx).await
let insert_stx = |stx| -> _ {
let mem_pool = Arc::clone(&self.mem_pool);
let ctx = ctx.clone();

runtime::spawn(async move { mem_pool.insert(ctx, stx).await })
};

// Concurrently insert them
try_join_all(
msg.batch_stxs
.into_iter()
.map(insert_stx)
.collect::<Vec<_>>(),
)
.await
.map(|_| ())
}
}

Expand Down
Loading

0 comments on commit 02c5d2e

Please sign in to comment.