Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Offload cull to IoWorker. #9099

Merged
merged 1 commit into from
Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl ClientService {

let pruning = config.pruning;
let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?;
miner.set_io_channel(io_service.channel());

let snapshot_params = SnapServiceParams {
engine: spec.engine.clone(),
Expand Down
23 changes: 15 additions & 8 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub struct Client {

/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
liveness: AtomicBool,
io_channel: Mutex<IoChannel<ClientIoMessage>>,
io_channel: RwLock<IoChannel<ClientIoMessage>>,

/// List of actors to be notified on certain chain events
notify: RwLock<Vec<Weak<ChainNotify>>>,
Expand Down Expand Up @@ -761,7 +761,7 @@ impl Client {
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel),
io_channel: RwLock::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
Expand Down Expand Up @@ -995,7 +995,7 @@ impl Client {

/// Replace io channel. Useful for testing.
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.lock() = io_channel;
*self.io_channel.write() = io_channel;
}

/// Get a copy of the best block's state.
Expand Down Expand Up @@ -2011,7 +2011,7 @@ impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
self.queue_transactions.queue(&self.io_channel.read(), len, move |client| {
trace_time!("import_queued_transactions");

let txs: Vec<UnverifiedTransaction> = transactions
Expand Down Expand Up @@ -2060,7 +2060,7 @@ impl IoClient for Client {

let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
Expand Down Expand Up @@ -2092,7 +2092,7 @@ impl IoClient for Client {
}

fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
Expand Down Expand Up @@ -2202,7 +2202,14 @@ impl ImportSealedBlock for Client {
route
};
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), self.engine.seals_internally().is_some());
self.importer.miner.chain_new_blocks(
self,
&[h.clone()],
&[],
route.enacted(),
route.retracted(),
self.engine.seals_internally().is_some(),
);
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
Expand Down Expand Up @@ -2526,7 +2533,7 @@ impl IoChannelQueue {
}
}

pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
Expand Down
44 changes: 41 additions & 3 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStat
#[cfg(feature = "work-notify")]
use ethcore_miner::work_notify::NotifyWork;
use ethereum_types::{H256, U256, Address};
use io::IoChannel;
use parking_lot::{Mutex, RwLock};
use rayon::prelude::*;
use transaction::{
Expand All @@ -44,7 +45,7 @@ use block::{ClosedBlock, IsBlock, Block, SealedBlock};
use client::{
BlockChain, ChainInfo, CallContract, BlockProducer, SealedBlockImporter, Nonce
};
use client::BlockId;
use client::{BlockId, ClientIoMessage};
use executive::contract_address;
use header::{Header, BlockNumber};
use miner;
Expand Down Expand Up @@ -211,6 +212,7 @@ pub struct Miner {
transaction_queue: Arc<TransactionQueue>,
engine: Arc<EthEngine>,
accounts: Option<Arc<AccountProvider>>,
io_channel: RwLock<Option<IoChannel<ClientIoMessage>>>,
}

impl Miner {
Expand All @@ -227,7 +229,12 @@ impl Miner {
}

/// Creates new instance of miner Arc.
pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option<Arc<AccountProvider>>) -> Self {
pub fn new(
options: MinerOptions,
gas_pricer: GasPricer,
spec: &Spec,
accounts: Option<Arc<AccountProvider>>,
) -> Self {
let limits = options.pool_limits.clone();
let verifier_options = options.pool_verification_options.clone();
let tx_queue_strategy = options.tx_queue_strategy;
Expand All @@ -251,6 +258,7 @@ impl Miner {
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts,
engine: spec.engine.clone(),
io_channel: RwLock::new(None),
}
}

Expand All @@ -270,6 +278,11 @@ impl Miner {
}, GasPricer::new_fixed(minimal_gas_price), spec, accounts)
}

/// Sets `IoChannel`
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.write() = Some(io_channel);
}

/// Clear all pending block states
pub fn clear(&self) {
self.sealing.lock().queue.reset();
Expand Down Expand Up @@ -1176,7 +1189,32 @@ impl miner::MinerService for Miner {
// (thanks to Ready), but culling can take significant amount of time,
// so best to leave it after we create some work for miners to prevent increased
// uncle rate.
self.transaction_queue.cull(client);
// If the io_channel is available attempt to offload culling to a separate task
// to avoid blocking chain_new_blocks
if let Some(ref channel) = *self.io_channel.read() {
let queue = self.transaction_queue.clone();
let nonce_cache = self.nonce_cache.clone();
let engine = self.engine.clone();
let accounts = self.accounts.clone();
let refuse_service_transactions = self.options.refuse_service_transactions;

let cull = move |chain: &::client::Client| {
let client = PoolClient::new(
chain,
&nonce_cache,
&*engine,
accounts.as_ref().map(|x| &**x),
refuse_service_transactions,
);
queue.cull(client);
};

if let Err(e) = channel.send(ClientIoMessage::execute(cull)) {
warn!(target: "miner", "Error queueing cull: {:?}", e);
}
} else {
self.transaction_queue.cull(client);
}
}
}

Expand Down
13 changes: 8 additions & 5 deletions ethcore/src/miner/pool_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

//! Blockchain access for transaction pool.

use std::fmt;
use std::collections::HashMap;
use std::{
collections::HashMap,
fmt,
sync::Arc,
};

use ethereum_types::{H256, U256, Address};
use ethcore_miner::pool;
Expand All @@ -37,17 +40,17 @@ use miner;
use miner::service_transaction_checker::ServiceTransactionChecker;

/// Cache for state nonces.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NonceCache {
nonces: RwLock<HashMap<Address, U256>>,
nonces: Arc<RwLock<HashMap<Address, U256>>>,
limit: usize
}

impl NonceCache {
/// Create new cache with a limit of `limit` entries.
pub fn new(limit: usize) -> Self {
NonceCache {
nonces: RwLock::new(HashMap::with_capacity(limit / 2)),
nonces: Arc::new(RwLock::new(HashMap::with_capacity(limit / 2))),
limit,
}
}
Expand Down
3 changes: 2 additions & 1 deletion parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
cmd.miner_options,
cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()),
&spec,
Some(account_provider.clone())
Some(account_provider.clone()),

));
miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed");
miner.set_gas_range_target(cmd.miner_extras.gas_range_target);
Expand Down