diff --git a/common/channel/src/lib.rs b/common/channel/src/lib.rs index 441e90123..bd88f6ced 100644 --- a/common/channel/src/lib.rs +++ b/common/channel/src/lib.rs @@ -7,7 +7,7 @@ use std::{marker::Unpin, pin::Pin}; use futures::prelude::{Sink, Stream}; use futures::task::{AtomicWaker, Context, Poll}; -pub use crossbeam_channel::{TryRecvError, TrySendError}; +pub use crossbeam_channel::{RecvError, TryRecvError, TrySendError}; pub struct Sender { inner: crossbeam_channel::Sender, @@ -103,6 +103,10 @@ impl Receiver { pub fn try_recv(&self) -> Result { self.inner.try_recv() } + + pub fn recv(&self) -> Result { + self.inner.recv() + } } pub fn bounded(cap: usize) -> (Sender, Receiver) { diff --git a/components/transaction-pool/src/lib.rs b/components/transaction-pool/src/lib.rs index 8f5262719..bb1bc41d3 100644 --- a/components/transaction-pool/src/lib.rs +++ b/components/transaction-pool/src/lib.rs @@ -319,7 +319,7 @@ fn cache_broadcast_txs(network: N, receiver: Receiver { buffer_txs.push(tx); } diff --git a/core/network/src/outbound/consensus.rs b/core/network/src/outbound/consensus.rs index 39f8d22d4..216cb7c44 100644 --- a/core/network/src/outbound/consensus.rs +++ b/core/network/src/outbound/consensus.rs @@ -1,5 +1,3 @@ -use futures::prelude::{FutureExt, TryFutureExt}; - use core_network_message::consensus::{Proposal, Vote}; use core_network_message::Method; use core_runtime::network::Consensus; @@ -9,26 +7,14 @@ use crate::{BytesBroadcaster, OutboundHandle}; impl Consensus for OutboundHandle { fn proposal(&self, proposal: Vec) { - let outbound = self.clone(); - - let job = async move { - let proposal = Proposal::from(proposal); - - outbound.silent_broadcast(Method::Proposal, proposal, Mode::Quick); - }; + let proposal = Proposal::from(proposal); - tokio::spawn(job.unit_error().boxed().compat()); + self.silent_broadcast(Method::Proposal, proposal, Mode::Quick); } fn vote(&self, vote: Vec) { - let outbound = self.clone(); - - let job = async move { - let vote = Vote::from(vote); - - outbound.silent_broadcast(Method::Vote, vote, Mode::Quick); - }; + let vote = Vote::from(vote); - tokio::spawn(job.unit_error().boxed().compat()); + self.silent_broadcast(Method::Vote, vote, Mode::Quick); } } diff --git a/core/network/src/outbound/sync.rs b/core/network/src/outbound/sync.rs index 70f0ab4e4..bfcf7411b 100644 --- a/core/network/src/outbound/sync.rs +++ b/core/network/src/outbound/sync.rs @@ -1,4 +1,4 @@ -use futures::prelude::{FutureExt, TryFutureExt}; +use futures::prelude::TryFutureExt; use core_context::Context; use core_network_message::common::PullTxs; @@ -13,15 +13,9 @@ use crate::{BytesBroadcaster, OutboundHandle}; impl Synchronizer for OutboundHandle { fn broadcast_status(&self, status: SyncStatus) { - let outbound = self.clone(); - - let job = async move { - let data = BroadcastStatus::from(status.hash, status.height); - - outbound.silent_broadcast(Method::SyncBroadcastStatus, data, Mode::Normal); - }; + let data = BroadcastStatus::from(status.hash, status.height); - tokio::spawn(job.unit_error().boxed().compat()); + self.silent_broadcast(Method::SyncBroadcastStatus, data, Mode::Normal); } fn pull_blocks(&self, ctx: Context, heights: Vec) -> FutSyncResult> { diff --git a/core/network/src/outbound/tx_pool.rs b/core/network/src/outbound/tx_pool.rs index dcd6988c0..beb19e9ed 100644 --- a/core/network/src/outbound/tx_pool.rs +++ b/core/network/src/outbound/tx_pool.rs @@ -1,4 +1,4 @@ -use futures::prelude::{FutureExt, TryFutureExt}; +use futures::prelude::TryFutureExt; use core_runtime::{network::TransactionPool, FutRuntimeResult, TransactionPoolError}; @@ -12,16 +12,10 @@ use crate::{BytesBroadcaster, OutboundHandle}; impl TransactionPool for OutboundHandle { fn broadcast_batch(&self, txs: Vec) { - let outbound = self.clone(); - - let job = async move { - let data = BroadcastTxs::from(txs); - - // TODO: retry ? - outbound.silent_broadcast(Method::BroadcastTxs, data, Mode::Normal); - }; + let data = BroadcastTxs::from(txs); - tokio::spawn(job.unit_error().boxed().compat()); + // TODO: retry ? + self.silent_broadcast(Method::BroadcastTxs, data, Mode::Normal); } fn pull_txs(