From 02c5d2e4470272a64c051fbacc7b4a630cad954a Mon Sep 17 00:00:00 2001 From: zeroqn Date: Mon, 28 Oct 2019 15:43:38 +0800 Subject: [PATCH] feat(mempool): implement cached batch txs broadcast (#20) * feat(mempool): implement cached batch txs broadcast * feat(mempool): split interval broadcaster into standalone task future Remove lock on whole transaction cache * fix(mempool): clippy warnings * chore(mempool): split interval timer into standalone task * fix(mempool): error on empty receiver channel in default adapter It's ok when err_rx channel is empty * fix(mempool): default adapter broadcast on None timer signal Should check whether there is an indeed interval signal * fix(mempool): panic in default adapter select macro Add complete branch * test(mempool): add interval broadcast unit tests * chore(config): add mempool interval broadcast entries --- Cargo.lock | 13 + core/mempool/Cargo.toml | 4 +- core/mempool/src/adapter/message.rs | 22 +- core/mempool/src/adapter/mod.rs | 392 +++++++++++++++++++++++++++- core/mempool/src/lib.rs | 1 + core/mempool/src/tests/mod.rs | 2 +- devtools/chain/config.toml | 2 + src/config.rs | 14 + src/main.rs | 2 + 9 files changed, 434 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1de1c216..dd72d3b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,7 +505,9 @@ dependencies = [ "crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "protocol 0.1.0", @@ -1009,6 +1011,16 @@ dependencies = [ "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-timer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-util-preview" version = "0.3.0-alpha.19" @@ -3772,6 +3784,7 @@ dependencies = [ "checksum futures-sink-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "86f148ef6b69f75bb610d4f9a2336d4fc88c4b5b67129d1a340dd0fd362efeec" "checksum futures-timer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8f9eb554aa23143abc64ec4d0016f038caf53bb7cbc3d91490835c54edc96550" "checksum futures-timer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "878f1d2fc31355fa02ed2372e741b0c17e58373341e6a122569b4623a14a7d33" +"checksum futures-timer 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2879f3aa8fd2f60d17ede13349e11d0c132d0daa1b44e061f133f8928ddfaeea" "checksum futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" diff --git a/core/mempool/Cargo.toml b/core/mempool/Cargo.toml index 858aed158..5c1afea37 100644 --- a/core/mempool/Cargo.toml +++ b/core/mempool/Cargo.toml @@ -11,7 +11,7 @@ protocol = { path = "../../protocol" } common-crypto = { path = "../../common/crypto" } core-network = { path = "../network" } -futures-preview = "0.3.0-alpha.19" +futures-preview = { version = "0.3.0-alpha.19", features = [ "async-await" ] } runtime-tokio = "0.3.0-alpha.6" runtime = "0.3.0-alpha.7" crossbeam-queue = "0.1" @@ -25,6 +25,8 @@ rand = "0.6" hex = "0.3" serde_derive = "1.0" serde = "1.0" +futures-timer = "1.0" +log = "0.4" [dev-dependencies] chashmap = "2.2" diff --git a/core/mempool/src/adapter/message.rs b/core/mempool/src/adapter/message.rs index b35383e8b..cb896bb9f 100644 --- a/core/mempool/src/adapter/message.rs +++ b/core/mempool/src/adapter/message.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use async_trait::async_trait; +use futures::future::try_join_all; use protocol::{ traits::{Context, MemPool, MessageHandler, Priority, Rpc}, types::{Hash, SignedTransaction}, @@ -16,8 +17,8 @@ pub const END_RESP_PULL_TXS: &str = "/rpc_resp/mempool/pull_txs"; #[derive(Debug, Serialize, Deserialize)] pub struct MsgNewTxs { - #[serde(with = "core_network::serde")] - pub stx: SignedTransaction, + #[serde(with = "core_network::serde_multi")] + pub batch_stxs: Vec, } pub struct NewTxsHandler { @@ -43,7 +44,22 @@ where async fn process(&self, ctx: Context, msg: Self::Message) -> ProtocolResult<()> { let ctx = ctx.mark_network_origin_new_txs(); - self.mem_pool.insert(ctx, msg.stx).await + let insert_stx = |stx| -> _ { + let mem_pool = Arc::clone(&self.mem_pool); + let ctx = ctx.clone(); + + runtime::spawn(async move { mem_pool.insert(ctx, stx).await }) + }; + + // Concurrently insert them + try_join_all( + msg.batch_stxs + .into_iter() + .map(insert_stx) + .collect::>(), + ) + .await + .map(|_| ()) } } diff --git a/core/mempool/src/adapter/mod.rs b/core/mempool/src/adapter/mod.rs index 71067a4ff..cff26764a 100644 --- a/core/mempool/src/adapter/mod.rs +++ b/core/mempool/src/adapter/mod.rs @@ -1,21 +1,32 @@ pub mod message; use std::{ + error::Error, marker::PhantomData, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, + sync::Arc, + time::Duration, }; use async_trait::async_trait; +use derive_more::Display; +use futures::{ + channel::mpsc::{ + channel, unbounded, Receiver, Sender, TrySendError, UnboundedReceiver, UnboundedSender, + }, + lock::Mutex, + select, + stream::StreamExt, +}; +use futures_timer::Delay; +use log::{debug, error}; use common_crypto::Crypto; use protocol::{ fixed_codec::ProtocolFixedCodec, traits::{Context, Gossip, MemPoolAdapter, Priority, Rpc, Storage}, types::{Hash, SignedTransaction}, - ProtocolResult, + ProtocolError, ProtocolErrorKind, ProtocolResult, }; use crate::adapter::message::{ @@ -23,27 +34,153 @@ use crate::adapter::message::{ }; use crate::MemPoolError; +pub const DEFAULT_BROADCAST_TXS_SIZE: usize = 200; +pub const DEFAULT_BROADCAST_TXS_INTERVAL: u64 = 200; // milliseconds + +struct IntervalTxsBroadcaster; + +impl IntervalTxsBroadcaster { + pub async fn broadcast( + stx_rx: UnboundedReceiver, + interval_reached: Receiver<()>, + tx_size: usize, + gossip: G, + err_tx: UnboundedSender, + ) where + G: Gossip + Clone + Unpin + 'static, + { + let mut stx_rx = stx_rx.fuse(); + let mut interval_rx = interval_reached.fuse(); + + let mut txs_cache = Vec::with_capacity(tx_size); + + loop { + select! { + opt_stx = stx_rx.next() => { + if let Some(stx) = opt_stx { + txs_cache.push(stx); + + if txs_cache.len() == tx_size { + Self::do_broadcast(&mut txs_cache, &gossip, err_tx.clone()).await + } + } else { + debug!("mempool: default mempool adapter dropped") + } + }, + signal = interval_rx.next() => { + if signal.is_some() { + Self::do_broadcast(&mut txs_cache, &gossip, err_tx.clone()).await + } + }, + complete => break, + }; + } + } + + pub async fn timer(mut signal_tx: Sender<()>, interval: u64) { + let interval = Duration::from_millis(interval); + + loop { + Delay::new(interval).await; + + if let Err(err) = signal_tx.try_send(()) { + // This means previous interval signal hasn't processed + // yet, simply drop this one. + if err.is_full() { + debug!("mempool: interval signal channel full"); + } + + if err.is_disconnected() { + error!("mempool: interval broadcaster dropped"); + } + } + } + } + + async fn do_broadcast( + txs_cache: &mut Vec, + gossip: &G, + err_tx: UnboundedSender, + ) where + G: Gossip + Unpin, + { + if txs_cache.is_empty() { + return; + } + + let batch_stxs = txs_cache.drain(..).collect::>(); + let gossip_msg = MsgNewTxs { batch_stxs }; + + let ctx = Context::new(); + let end = END_GOSSIP_NEW_TXS; + + let report_if_err = move |ret: ProtocolResult<()>| { + if let Err(err) = ret { + if err_tx.unbounded_send(err).is_err() { + error!("mempool: default mempool adapter dropped"); + } + } + }; + + report_if_err( + gossip + .broadcast(ctx, end, gossip_msg, Priority::Normal) + .await, + ) + } +} + pub struct DefaultMemPoolAdapter { network: N, storage: Arc, timeout_gap: AtomicU64, + stx_tx: UnboundedSender, + err_rx: Mutex>, + pin_c: PhantomData, } impl DefaultMemPoolAdapter where C: Crypto, - N: Rpc + Gossip, + N: Rpc + Gossip + Clone + Unpin + 'static, S: Storage, { - pub fn new(network: N, storage: Arc, timeout_gap: u64) -> Self { + pub fn new( + network: N, + storage: Arc, + timeout_gap: u64, + broadcast_txs_size: usize, + broadcast_txs_interval: u64, + ) -> Self { + let (stx_tx, stx_rx) = unbounded(); + let (err_tx, err_rx) = unbounded(); + let (signal_tx, interval_reached) = channel(1); + + runtime::spawn(IntervalTxsBroadcaster::timer( + signal_tx, + broadcast_txs_interval, + )); + + runtime::spawn(IntervalTxsBroadcaster::broadcast( + stx_rx, + interval_reached, + broadcast_txs_size, + network.clone(), + err_tx, + )); + DefaultMemPoolAdapter { network, storage, + timeout_gap: AtomicU64::new(timeout_gap), + stx_tx, + err_rx: Mutex::new(err_rx), + pin_c: PhantomData, } } @@ -53,7 +190,7 @@ where impl MemPoolAdapter for DefaultMemPoolAdapter where C: Crypto + Send + Sync + 'static, - N: Rpc + Gossip + 'static, + N: Rpc + Gossip + Clone + Unpin + 'static, S: Storage + 'static, { async fn pull_txs( @@ -71,12 +208,20 @@ where Ok(resp_msg.sig_txs) } - async fn broadcast_tx(&self, ctx: Context, stx: SignedTransaction) -> ProtocolResult<()> { - let gossip_msg = MsgNewTxs { stx }; + async fn broadcast_tx(&self, _ctx: Context, stx: SignedTransaction) -> ProtocolResult<()> { + self.stx_tx + .unbounded_send(stx) + .map_err(AdapterError::from)?; - self.network - .broadcast(ctx, END_GOSSIP_NEW_TXS, gossip_msg, Priority::Normal) - .await + if let Some(mut err_rx) = self.err_rx.try_lock() { + match err_rx.try_next() { + Ok(Some(err)) => return Err(err), + // Error means receiver channel is empty, is ok here + Ok(None) | Err(_) => return Ok(()), + } + } + + Ok(()) } async fn check_signature(&self, _ctx: Context, tx: SignedTransaction) -> ProtocolResult<()> { @@ -162,3 +307,224 @@ where Ok(epoch_id) } } + +#[derive(Debug, Display)] +pub enum AdapterError { + #[display(fmt = "adapter: interval broadcaster drop")] + IntervalBroadcasterDrop, +} + +impl Error for AdapterError {} + +impl From> for AdapterError { + fn from(_error: TrySendError) -> AdapterError { + AdapterError::IntervalBroadcasterDrop + } +} + +impl From for ProtocolError { + fn from(error: AdapterError) -> ProtocolError { + ProtocolError::new(ProtocolErrorKind::Mempool, Box::new(error)) + } +} + +#[cfg(test)] +mod tests { + use super::IntervalTxsBroadcaster; + + use crate::{adapter::message::MsgNewTxs, tests::default_mock_txs}; + + use protocol::{ + traits::{Context, Gossip, MessageCodec, Priority}, + types::UserAddress, + ProtocolResult, + }; + + use async_trait::async_trait; + use bytes::Bytes; + use futures::{ + channel::mpsc::{channel, unbounded, UnboundedSender}, + stream::StreamExt, + }; + use parking_lot::Mutex; + + use std::{ + ops::Sub, + sync::Arc, + time::{Duration, Instant}, + }; + + #[derive(Clone)] + struct MockGossip { + msgs: Arc>>, + signal_tx: UnboundedSender<()>, + } + + impl MockGossip { + pub fn new(signal_tx: UnboundedSender<()>) -> Self { + MockGossip { + msgs: Default::default(), + signal_tx, + } + } + } + + #[async_trait] + impl Gossip for MockGossip { + async fn broadcast( + &self, + _: Context, + _: &str, + mut msg: M, + _: Priority, + ) -> ProtocolResult<()> + where + M: MessageCodec, + { + let bytes = msg.encode().await.expect("encode message fail"); + self.msgs.lock().push(bytes); + + self.signal_tx + .unbounded_send(()) + .expect("send broadcast signal fail"); + + Ok(()) + } + + async fn users_cast( + &self, + _: Context, + _: &str, + _: Vec, + _: M, + _: Priority, + ) -> ProtocolResult<()> + where + M: MessageCodec, + { + unreachable!() + } + } + + macro_rules! pop_msg { + ($msgs:expr) => {{ + let msg = $msgs.pop().expect("should have one message"); + MsgNewTxs::decode(msg).await.expect("decode MsgNewTxs fail") + }}; + } + + #[runtime::test(runtime_tokio::Tokio)] + async fn test_interval_timer() { + let (tx, mut rx) = channel(1); + let interval = Duration::from_millis(200); + let now = Instant::now(); + + runtime::spawn(IntervalTxsBroadcaster::timer(tx, 200)); + rx.next().await.expect("await interval signal fail"); + + assert!(now.elapsed().sub(interval).as_millis() < 100u128); + } + + #[runtime::test(runtime_tokio::Tokio)] + async fn test_interval_broadcast_reach_cache_size() { + let (stx_tx, stx_rx) = unbounded(); + let (err_tx, _err_rx) = unbounded(); + let (_signal_tx, interval_reached) = channel(1); + let tx_size = 10; + let (broadcast_signal_tx, mut broadcast_signal_rx) = unbounded(); + let gossip = MockGossip::new(broadcast_signal_tx); + + runtime::spawn(IntervalTxsBroadcaster::broadcast( + stx_rx, + interval_reached, + tx_size, + gossip.clone(), + err_tx, + )); + + for stx in default_mock_txs(11).into_iter() { + stx_tx.unbounded_send(stx).expect("send stx fail"); + } + + broadcast_signal_rx.next().await; + let mut msgs = gossip.msgs.lock().drain(..).collect::>(); + assert_eq!(msgs.len(), 1, "should only have one message"); + + let msg = pop_msg!(msgs); + assert_eq!(msg.batch_stxs.len(), 10, "should only have 10 stx"); + } + + #[runtime::test(runtime_tokio::Tokio)] + async fn test_interval_broadcast_reach_interval() { + let (stx_tx, stx_rx) = unbounded(); + let (err_tx, _err_rx) = unbounded(); + let (signal_tx, interval_reached) = channel(1); + let tx_size = 10; + let (broadcast_signal_tx, mut broadcast_signal_rx) = unbounded(); + let gossip = MockGossip::new(broadcast_signal_tx); + + runtime::spawn(IntervalTxsBroadcaster::timer(signal_tx, 200)); + runtime::spawn(IntervalTxsBroadcaster::broadcast( + stx_rx, + interval_reached, + tx_size, + gossip.clone(), + err_tx, + )); + + for stx in default_mock_txs(9).into_iter() { + stx_tx.unbounded_send(stx).expect("send stx fail"); + } + + broadcast_signal_rx.next().await; + let mut msgs = gossip.msgs.lock().drain(..).collect::>(); + assert_eq!(msgs.len(), 1, "should only have one message"); + + let msg = pop_msg!(msgs); + assert_eq!(msg.batch_stxs.len(), 9, "should only have 9 stx"); + } + + #[runtime::test(runtime_tokio::Tokio)] + async fn test_interval_broadcast() { + let (stx_tx, stx_rx) = unbounded(); + let (err_tx, _err_rx) = unbounded(); + let (signal_tx, interval_reached) = channel(1); + let tx_size = 10; + let (broadcast_signal_tx, mut broadcast_signal_rx) = unbounded(); + let gossip = MockGossip::new(broadcast_signal_tx); + + runtime::spawn(IntervalTxsBroadcaster::timer(signal_tx, 200)); + runtime::spawn(IntervalTxsBroadcaster::broadcast( + stx_rx, + interval_reached, + tx_size, + gossip.clone(), + err_tx, + )); + + for stx in default_mock_txs(19).into_iter() { + stx_tx.unbounded_send(stx).expect("send stx fail"); + } + + // Should got two broadcast + broadcast_signal_rx.next().await; + broadcast_signal_rx.next().await; + + let mut msgs = gossip.msgs.lock().drain(..).collect::>(); + assert_eq!(msgs.len(), 2, "should only have two messages"); + + let msg = pop_msg!(msgs); + assert_eq!( + msg.batch_stxs.len(), + 9, + "last message should only have 9 stx" + ); + + let msg = pop_msg!(msgs); + assert_eq!( + msg.batch_stxs.len(), + 10, + "first message should only have 10 stx" + ); + } +} diff --git a/core/mempool/src/lib.rs b/core/mempool/src/lib.rs index 57a7f7abe..63b2ab6fb 100644 --- a/core/mempool/src/lib.rs +++ b/core/mempool/src/lib.rs @@ -11,6 +11,7 @@ pub use adapter::message::{ NewTxsHandler, PullTxsHandler, END_GOSSIP_NEW_TXS, END_RESP_PULL_TXS, END_RPC_PULL_TXS, }; pub use adapter::DefaultMemPoolAdapter; +pub use adapter::{DEFAULT_BROADCAST_TXS_INTERVAL, DEFAULT_BROADCAST_TXS_SIZE}; use std::error::Error; diff --git a/core/mempool/src/tests/mod.rs b/core/mempool/src/tests/mod.rs index f1edd1760..8665fa7d7 100644 --- a/core/mempool/src/tests/mod.rs +++ b/core/mempool/src/tests/mod.rs @@ -88,7 +88,7 @@ impl MemPoolAdapter for HashMemPoolAdapter { } } -fn default_mock_txs(size: usize) -> Vec { +pub fn default_mock_txs(size: usize) -> Vec { mock_txs(size, 0, TIMEOUT) } diff --git a/devtools/chain/config.toml b/devtools/chain/config.toml index 517f8b63a..b7325fe84 100644 --- a/devtools/chain/config.toml +++ b/devtools/chain/config.toml @@ -22,6 +22,8 @@ address = "0.0.0.0:1888" [mempool] timeout_gap = 20 pool_size = 20000 +broadcast_txs_size = 200 +broadcast_txs_interval = 200 [consensus] cycles_limit = 99999999 diff --git a/src/config.rs b/src/config.rs index 84e075b47..10fa6e73b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use serde_derive::Deserialize; use core_consensus::DurationConfig; +use core_mempool::{DEFAULT_BROADCAST_TXS_INTERVAL, DEFAULT_BROADCAST_TXS_SIZE}; #[derive(Debug, Deserialize)] pub struct ConfigGraphQL { @@ -24,10 +25,23 @@ pub struct ConfigNetworkBootstrap { pub address: SocketAddr, } +fn default_broadcast_txs_size() -> usize { + DEFAULT_BROADCAST_TXS_SIZE +} + +fn default_broadcast_txs_interval() -> u64 { + DEFAULT_BROADCAST_TXS_INTERVAL +} + #[derive(Debug, Deserialize)] pub struct ConfigMempool { pub timeout_gap: u64, pub pool_size: u64, + + #[serde(default = "default_broadcast_txs_size")] + pub broadcast_txs_size: usize, + #[serde(default = "default_broadcast_txs_interval")] + pub broadcast_txs_interval: u64, } #[derive(Debug, Deserialize)] diff --git a/src/main.rs b/src/main.rs index 358553251..8551e02b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -191,6 +191,8 @@ async fn start(cfg: &Config) -> ProtocolResult<()> { network_service.handle(), Arc::clone(&storage), cfg.mempool.timeout_gap, + cfg.mempool.broadcast_txs_size, + cfg.mempool.broadcast_txs_interval, ); let mempool = Arc::new(HashMemPool::new( cfg.mempool.pool_size as usize,