Skip to content

Commit

Permalink
fix: tokio::spawn panic. (nervosnetwork#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
yejiayu authored May 21, 2019
1 parent 42af2ad commit 12d8d01
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 39 deletions.
6 changes: 5 additions & 1 deletion common/channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{marker::Unpin, pin::Pin};
use futures::prelude::{Sink, Stream};
use futures::task::{AtomicWaker, Context, Poll};

pub use crossbeam_channel::{TryRecvError, TrySendError};
pub use crossbeam_channel::{RecvError, TryRecvError, TrySendError};

pub struct Sender<T> {
inner: crossbeam_channel::Sender<T>,
Expand Down Expand Up @@ -103,6 +103,10 @@ impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.inner.try_recv()
}

pub fn recv(&self) -> Result<T, RecvError> {
self.inner.recv()
}
}

pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
Expand Down
2 changes: 1 addition & 1 deletion components/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ fn cache_broadcast_txs<N: Network>(network: N, receiver: Receiver<SignedTransact
network.broadcast_batch(temp);
}

match receiver.try_recv() {
match receiver.recv() {
Ok(tx) => {
buffer_txs.push(tx);
}
Expand Down
22 changes: 4 additions & 18 deletions core/network/src/outbound/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use futures::prelude::{FutureExt, TryFutureExt};

use core_network_message::consensus::{Proposal, Vote};
use core_network_message::Method;
use core_runtime::network::Consensus;
Expand All @@ -9,26 +7,14 @@ use crate::{BytesBroadcaster, OutboundHandle};

impl Consensus for OutboundHandle {
fn proposal(&self, proposal: Vec<u8>) {
let outbound = self.clone();

let job = async move {
let proposal = Proposal::from(proposal);

outbound.silent_broadcast(Method::Proposal, proposal, Mode::Quick);
};
let proposal = Proposal::from(proposal);

tokio::spawn(job.unit_error().boxed().compat());
self.silent_broadcast(Method::Proposal, proposal, Mode::Quick);
}

fn vote(&self, vote: Vec<u8>) {
let outbound = self.clone();

let job = async move {
let vote = Vote::from(vote);

outbound.silent_broadcast(Method::Vote, vote, Mode::Quick);
};
let vote = Vote::from(vote);

tokio::spawn(job.unit_error().boxed().compat());
self.silent_broadcast(Method::Vote, vote, Mode::Quick);
}
}
12 changes: 3 additions & 9 deletions core/network/src/outbound/sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::prelude::{FutureExt, TryFutureExt};
use futures::prelude::TryFutureExt;

use core_context::Context;
use core_network_message::common::PullTxs;
Expand All @@ -13,15 +13,9 @@ use crate::{BytesBroadcaster, OutboundHandle};

impl Synchronizer for OutboundHandle {
fn broadcast_status(&self, status: SyncStatus) {
let outbound = self.clone();

let job = async move {
let data = BroadcastStatus::from(status.hash, status.height);

outbound.silent_broadcast(Method::SyncBroadcastStatus, data, Mode::Normal);
};
let data = BroadcastStatus::from(status.hash, status.height);

tokio::spawn(job.unit_error().boxed().compat());
self.silent_broadcast(Method::SyncBroadcastStatus, data, Mode::Normal);
}

fn pull_blocks(&self, ctx: Context, heights: Vec<u64>) -> FutSyncResult<Vec<Block>> {
Expand Down
14 changes: 4 additions & 10 deletions core/network/src/outbound/tx_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::prelude::{FutureExt, TryFutureExt};
use futures::prelude::TryFutureExt;

use core_runtime::{network::TransactionPool, FutRuntimeResult, TransactionPoolError};

Expand All @@ -12,16 +12,10 @@ use crate::{BytesBroadcaster, OutboundHandle};

impl TransactionPool for OutboundHandle {
fn broadcast_batch(&self, txs: Vec<SignedTransaction>) {
let outbound = self.clone();

let job = async move {
let data = BroadcastTxs::from(txs);

// TODO: retry ?
outbound.silent_broadcast(Method::BroadcastTxs, data, Mode::Normal);
};
let data = BroadcastTxs::from(txs);

tokio::spawn(job.unit_error().boxed().compat());
// TODO: retry ?
self.silent_broadcast(Method::BroadcastTxs, data, Mode::Normal);
}

fn pull_txs(
Expand Down

0 comments on commit 12d8d01

Please sign in to comment.