Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Limit the number of transactions in pending set #8777

Merged
merged 18 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);

/// Max number of transactions in a single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;

// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);

Expand Down Expand Up @@ -647,7 +650,7 @@ impl LightProtocol {
fn propagate_transactions(&self, io: &IoContext) {
if self.capabilities.read().tx_relay { return }

let ready_transactions = self.provider.ready_transactions();
let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE);
if ready_transactions.is_empty() { return }

trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
Expand Down
4 changes: 2 additions & 2 deletions ethcore/light/src/net/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ impl Provider for TestProvider {
})
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
self.0.client.ready_transactions()
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
self.0.client.ready_transactions(max_len)
}
}

Expand Down
13 changes: 8 additions & 5 deletions ethcore/light/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub trait Provider: Send + Sync {
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;

/// Provide pending transactions.
fn ready_transactions(&self) -> Vec<PendingTransaction>;
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction>;

/// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction.
Expand Down Expand Up @@ -280,8 +280,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
.map(|(_, proof)| ::request::ExecutionResponse { items: proof })
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self, max_len)
.into_iter()
.map(|tx| tx.pending().clone())
.collect()
Expand Down Expand Up @@ -367,9 +367,12 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
None
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
let chain_info = self.chain_info();
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
let mut transactions = self.txqueue.read()
.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp);
transactions.truncate(max_len);
transactions
}
}

Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1956,8 +1956,8 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
}

fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority)
}

fn signing_chain_id(&self) -> Option<u64> {
Expand Down
6 changes: 3 additions & 3 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry;
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
use error::ImportResult;
use vm::Schedule;
use miner::{Miner, MinerService};
use miner::{self, Miner, MinerService};
use spec::Spec;
use types::basic_account::BasicAccount;
use types::pruning_info::PruningInfo;
Expand Down Expand Up @@ -806,8 +806,8 @@ impl BlockChainClient for TestBlockChainClient {
self.traces.read().clone()
}

fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority)
}

fn signing_chain_id(&self) -> Option<u64> { None }
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
fn last_hashes(&self) -> LastHashes;

/// List all transactions that are allowed into the next block.
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>>;
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>>;

/// Sorted list of transaction gas prices from at least last sample_size blocks.
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {
Expand Down
51 changes: 35 additions & 16 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,18 +364,28 @@ impl Miner {

let client = self.pool_client(chain);
let engine_params = self.engine.params();
let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition {
Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into())
} else {
None
};
// we will never need more transactions than limit divided by min gas
let max_transactions = if min_tx_gas.is_zero() {
usize::max_value()
} else {
(*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize
};

let pending: Vec<Arc<_>> = self.transaction_queue.pending(
client.clone(),
chain_info.best_block_number,
chain_info.best_block_timestamp,
nonce_cap,
pool::PendingSettings {
block_number: chain_info.best_block_number,
current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len: max_transactions,
ordering: miner::PendingOrdering::Priority,
}
);

let took_ms = |elapsed: &Duration| {
Expand Down Expand Up @@ -805,20 +815,28 @@ impl miner::MinerService for Miner {
self.transaction_queue.all_transactions()
}

fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>> where
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
-> Vec<Arc<VerifiedTransaction>>
where
C: ChainInfo + Nonce + Sync,
{
let chain_info = chain.chain_info();

let from_queue = || {
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
let nonce_cap = None;

self.transaction_queue.pending(
CachedNonceClient::new(chain, &self.nonce_cache),
chain_info.best_block_number,
chain_info.best_block_timestamp,
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
None,
pool::PendingSettings {
block_number: chain_info.best_block_number,
current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len,
ordering,
},
)
};

Expand All @@ -828,6 +846,7 @@ impl miner::MinerService for Miner {
.iter()
.map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone()))
.map(Arc::new)
.take(max_len)
.collect()
}, chain_info.best_block_number)
};
Expand Down Expand Up @@ -1081,7 +1100,7 @@ mod tests {
use rustc_hex::FromHex;

use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock};
use miner::MinerService;
use miner::{MinerService, PendingOrdering};
use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts};
use transaction::{Transaction};

Expand Down Expand Up @@ -1177,7 +1196,7 @@ mod tests {
assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1);
assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1);
assert_eq!(miner.ready_transactions(&client).len(), 1);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
// This method will let us know if pending block was created (before calling that method)
assert!(!miner.prepare_pending_block(&client));
}
Expand All @@ -1196,7 +1215,7 @@ mod tests {
assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None);
assert_eq!(miner.ready_transactions(&client).len(), 1);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
}

#[test]
Expand All @@ -1215,11 +1234,11 @@ mod tests {
assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None);
// By default we use PendingSet::AlwaysSealing, so no transactions yet.
assert_eq!(miner.ready_transactions(&client).len(), 0);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0);
// This method will let us know if pending block was created (before calling that method)
assert!(miner.prepare_pending_block(&client));
// After pending block is created we should see a transaction.
assert_eq!(miner.ready_transactions(&client).len(), 1);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
}

#[test]
Expand Down
7 changes: 5 additions & 2 deletions ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod pool_client;
pub mod stratum;

pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
pub use ethcore_miner::pool::PendingOrdering;

use std::sync::Arc;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -156,10 +157,12 @@ pub trait MinerService : Send + Sync {
fn next_nonce<C>(&self, chain: &C, address: &Address) -> U256
where C: Nonce + Sync;

/// Get a list of all ready transactions.
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>>
/// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of
/// transactions.
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec<Arc<VerifiedTransaction>>
where C: ChainInfo + Nonce + Sync;

/// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet).
Expand Down
10 changes: 5 additions & 5 deletions ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use test_helpers::{
use types::filter::Filter;
use ethereum_types::{U256, Address};
use kvdb_rocksdb::{Database, DatabaseConfig};
use miner::Miner;
use miner::{Miner, PendingOrdering};
use spec::Spec;
use views::BlockView;
use ethkey::KeyPair;
Expand Down Expand Up @@ -343,12 +343,12 @@ fn does_not_propagate_delayed_transactions() {

client.miner().import_own_transaction(&*client, tx0).unwrap();
client.miner().import_own_transaction(&*client, tx1).unwrap();
assert_eq!(0, client.ready_transactions().len());
assert_eq!(0, client.miner().ready_transactions(&*client).len());
assert_eq!(0, client.ready_transactions(10).len());
assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
push_blocks_to_client(&client, 53, 2, 2);
client.flush_queue();
assert_eq!(2, client.ready_transactions().len());
assert_eq!(2, client.miner().ready_transactions(&*client).len());
assert_eq!(2, client.ready_transactions(10).len());
assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
}

#[test]
Expand Down
21 changes: 16 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ impl SyncProvider for EthSync {
}
}

const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;

struct SyncProtocolHandler {
/// Shared blockchain client.
chain: Arc<BlockChainClient>,
Expand All @@ -373,7 +377,9 @@ struct SyncProtocolHandler {
impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer");
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
}
}

Expand All @@ -399,12 +405,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
}

fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
trace_time!("sync::timeout");
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io);
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}

Expand Down
10 changes: 7 additions & 3 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
// Maximal number of transactions queried from miner to propagate.
// This set is used to diff with transactions known by the peer and
// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`.
const MAX_TRANSACTIONS_TO_QUERY: usize = 4096;
// Maximal number of transactions in sent in single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// Min number of blocks to be behind for a snapshot sync
Expand Down Expand Up @@ -1143,7 +1147,7 @@ pub mod tests {
use super::{PeerInfo, PeerAsking};
use ethcore::header::*;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
use ethcore::miner::MinerService;
use ethcore::miner::{MinerService, PendingOrdering};
use private_tx::NoopPrivateTxHandler;

pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
Expand Down Expand Up @@ -1355,7 +1359,7 @@ pub mod tests {
let mut io = TestIo::new(&mut client, &ss, &queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1);
assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1);
}
// We need to update nonce status (because we say that the block has been imported)
for h in &[good_blocks[0]] {
Expand All @@ -1371,7 +1375,7 @@ pub mod tests {
}

// then
assert_eq!(client.miner.ready_transactions(&client).len(), 1);
assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::{
MAX_PEERS_PROPAGATION,
MAX_TRANSACTION_PACKET_SIZE,
MAX_TRANSACTIONS_TO_PROPAGATE,
MAX_TRANSACTIONS_TO_QUERY,
MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET,
Expand Down Expand Up @@ -114,7 +115,7 @@ impl SyncPropagator {
return 0;
}

let transactions = io.chain().ready_transactions();
let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY);
if transactions.is_empty() {
return 0;
}
Expand Down
Loading