Skip to content

Commit

Permalink
refactor(bft): Remove thread pool. (nervosnetwork#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
yejiayu authored Jun 10, 2019
1 parent 56d8649 commit 3204f3e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 23 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ core-serialization = { path = "../serialization" }
core-pubsub = { path = "../pubsub" }

parking_lot = "0.8"
num_cpus = "1.0"
log = "0.4"
tokio = "0.1"
futures-timer = "0.1"
Expand Down
25 changes: 6 additions & 19 deletions core/consensus/src/bft/support.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cmp;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;
Expand All @@ -7,7 +6,7 @@ use bft_rs::{
Address as BFTAddress, BftMsg, BftSupport, Commit, Node as BftNode, Signature,
Status as BftStatus,
};
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use futures::executor::block_on;
use parking_lot::RwLock;

use core_context::{Context, P2P_SESSION_ID};
Expand All @@ -34,9 +33,6 @@ where
N: Network + 'static,
{
engine: Arc<Engine<E, T, S, C>>,
// Because "bft-rs" is not in the futures runtime,
// to ensure performance use a separate thread pool to run the futures in "support".
thread_pool: ThreadPool,

network: N,
proposal_origin: RwLock<HashMap<Hash, ProposalOriginValue>>,
Expand All @@ -51,14 +47,8 @@ where
N: Network + 'static,
{
pub(crate) fn new(engine: Arc<Engine<E, T, S, C>>, network: N) -> ConsensusResult<Self> {
let thread_pool = ThreadPoolBuilder::new()
.pool_size(cmp::max(4, num_cpus::get() / 4))
.create()
.map_err(|e| ConsensusError::Internal(e.to_string()))?;

Ok(Self {
engine,
thread_pool,
network,

proposal_origin: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -116,8 +106,7 @@ where
Ok(())
};

let mut pool = self.thread_pool.clone();
pool.run(fut)
block_on(fut)
}

/// A user-defined function for transactions validation.
Expand Down Expand Up @@ -163,8 +152,7 @@ where
Ok(())
};

let mut pool = self.thread_pool.clone();
pool.run(fut)
block_on(fut)
}

/// A user-defined function for transmitting signed_proposals and
Expand Down Expand Up @@ -231,8 +219,7 @@ where
})
};

let mut pool = self.thread_pool.clone();
pool.run(fut)
block_on(fut)
}

/// A user-defined function for feeding the bft consensus.
Expand All @@ -246,8 +233,8 @@ where
let encoded = AsyncCodec::encode(ser_proposal).await?;
Ok((encoded, proposal_hash.as_bytes().to_vec()))
};
let mut pool = self.thread_pool.clone();
pool.run(fut)

block_on(fut)
}

/// A user-defined function for signing a [`hash`].
Expand Down
8 changes: 6 additions & 2 deletions core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ where
proposal: Proposal,
latest_proof: Proof,
) -> ConsensusResult<ConsensusStatus> {
let _lock = self.lock.lock().await;
if self.lock.try_lock().is_none() {
return Err(ConsensusError::Internal("locked in sync".to_owned()));
}

let status = self.get_status();
if status.height + 1 != proposal.height {
Expand Down Expand Up @@ -332,7 +334,9 @@ where
signed_txs: Vec<SignedTransaction>,
proof: Proof,
) -> ConsensusResult<ConsensusStatus> {
let _lock = self.lock.lock().await;
if self.lock.try_lock().is_none() {
return Err(ConsensusError::Internal("locked in consensus".to_owned()));
}

let status = self.get_status();
if status.height + 1 != block.header.height {
Expand Down

0 comments on commit 3204f3e

Please sign in to comment.