Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Limit the number of transactions in pending set #8777

Merged
merged 18 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);

/// Max number of transactions in a single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;

// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);

Expand Down Expand Up @@ -648,7 +651,7 @@ impl LightProtocol {
fn propagate_transactions(&self, io: &IoContext) {
if self.capabilities.read().tx_relay { return }

let ready_transactions = self.provider.ready_transactions();
let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE);
if ready_transactions.is_empty() { return }

trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
Expand Down
13 changes: 8 additions & 5 deletions ethcore/light/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub trait Provider: Send + Sync {
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;

/// Provide pending transactions.
fn ready_transactions(&self) -> Vec<PendingTransaction>;
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction>;

/// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction.
Expand Down Expand Up @@ -280,8 +280,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
.map(|(_, proof)| ::request::ExecutionResponse { items: proof })
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self, max_len)
.into_iter()
.map(|tx| tx.pending().clone())
.collect()
Expand Down Expand Up @@ -367,9 +367,12 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
None
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
let chain_info = self.chain_info();
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
let mut transactions = self.txqueue.read()
.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp);
transactions.truncate(max_len);
transactions
}
}

Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1954,8 +1954,8 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
}

fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority)
}

fn signing_chain_id(&self) -> Option<u64> {
Expand Down
6 changes: 3 additions & 3 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry;
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
use error::ImportResult;
use vm::Schedule;
use miner::{Miner, MinerService};
use miner::{self, Miner, MinerService};
use spec::Spec;
use types::basic_account::BasicAccount;
use types::mode::Mode;
Expand Down Expand Up @@ -808,8 +808,8 @@ impl BlockChainClient for TestBlockChainClient {
self.traces.read().clone()
}

fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority)
}

fn signing_chain_id(&self) -> Option<u64> { None }
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
fn last_hashes(&self) -> LastHashes;

/// List all transactions that are allowed into the next block.
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>>;
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>>;

/// Sorted list of transaction gas prices from at least last sample_size blocks.
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {
Expand Down
41 changes: 30 additions & 11 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,28 @@ impl Miner {

let client = self.pool_client(chain);
let engine_params = self.engine.params();
let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition {
Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into())
} else {
None
};
// we will never need more transactions than limit divided by min gas
let max_transactions = if min_tx_gas.is_zero() {
usize::max_value()
} else {
(*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize
};

let pending: Vec<Arc<_>> = self.transaction_queue.pending(
client.clone(),
chain_info.best_block_number,
chain_info.best_block_timestamp,
nonce_cap,
pool::PendingSettings {
block_number: chain_info.best_block_number,
current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len: max_transactions,
ordering: miner::PendingOrdering::Priority,
}
);

let took_ms = |elapsed: &Duration| {
Expand Down Expand Up @@ -807,20 +817,28 @@ impl miner::MinerService for Miner {
self.transaction_queue.all_transactions()
}

fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>> where
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
-> Vec<Arc<VerifiedTransaction>>
where
C: ChainInfo + Nonce + Sync,
{
let chain_info = chain.chain_info();

let from_queue = || {
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
let nonce_cap = None;

self.transaction_queue.pending(
CachedNonceClient::new(chain, &self.nonce_cache),
chain_info.best_block_number,
chain_info.best_block_timestamp,
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
None,
pool::PendingSettings {
block_number: chain_info.best_block_number,
current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len,
ordering,
},
)
};

Expand All @@ -830,6 +848,7 @@ impl miner::MinerService for Miner {
.iter()
.map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone()))
.map(Arc::new)
.take(max_len)
.collect()
}, chain_info.best_block_number)
};
Expand Down
7 changes: 5 additions & 2 deletions ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod pool_client;
pub mod stratum;

pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
pub use ethcore_miner::pool::PendingOrdering;

use std::sync::Arc;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -158,10 +159,12 @@ pub trait MinerService : Send + Sync {
fn next_nonce<C>(&self, chain: &C, address: &Address) -> U256
where C: Nonce + Sync;

/// Get a list of all ready transactions.
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>>
/// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of
/// transactions.
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec<Arc<VerifiedTransaction>>
where C: ChainInfo + Nonce + Sync;

/// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet).
Expand Down
21 changes: 16 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ impl SyncProvider for EthSync {
}
}

const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;

struct SyncProtocolHandler {
/// Shared blockchain client.
chain: Arc<BlockChainClient>,
Expand All @@ -373,7 +377,9 @@ struct SyncProtocolHandler {
impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer");
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering sync timer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/sync/peers/ in the expect message

io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering sync timer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expect message should be changed probably.

}
}

Expand All @@ -399,12 +405,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
}

fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
trace_time!("sync::timeout");
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io);
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
// Maximal number of transactions queried from miner to propagate.
// This set is used to diff with transactions known by the peer and
// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`.
const MAX_TRANSACTIONS_TO_QUERY: usize = 1024;
// Maximal number of transactions in sent in single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// Min number of blocks to be behind for a snapshot sync
Expand Down
3 changes: 2 additions & 1 deletion ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::{
MAX_PEERS_PROPAGATION,
MAX_TRANSACTION_PACKET_SIZE,
MAX_TRANSACTIONS_TO_PROPAGATE,
MAX_TRANSACTIONS_TO_QUERY,
MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET,
Expand Down Expand Up @@ -114,7 +115,7 @@ impl SyncPropagator {
return 0;
}

let transactions = io.chain().ready_transactions();
let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY);
if transactions.is_empty() {
return 0;
}
Expand Down
39 changes: 38 additions & 1 deletion miner/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Transaction Pool

use ethereum_types::{H256, Address};
use ethereum_types::{U256, H256, Address};
use heapsize::HeapSizeOf;
use transaction;
use txpool;
Expand Down Expand Up @@ -45,6 +45,43 @@ pub enum PrioritizationStrategy {
GasPriceOnly,
}

/// Transaction ordering when requesting pending set.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PendingOrdering {
/// Get pending transactions ordered by their priority (potentially expensive)
Priority,
/// Get pending transactions without any care of particular ordering (cheaper).
Unordered,
}

/// Pending set query settings
#[derive(Debug, Clone)]
pub struct PendingSettings {
/// Current block number (affects readiness of some transactions).
pub block_number: u64,
/// Current timestamp (affects readiness of some transactions).
pub current_timestamp: u64,
/// Nonce cap (for dust protection; EIP-168)
pub nonce_cap: Option<U256>,
/// Maximal number of transactions in pending the set.
pub max_len: usize,
/// Ordering of transactions.
pub ordering: PendingOrdering,
}

impl PendingSettings {
/// Get all transactions (no cap or len limit) prioritized.
pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self {
PendingSettings {
block_number,
current_timestamp,
nonce_cap: None,
max_len: usize::max_value(),
ordering: PendingOrdering::Priority,
}
}
}

/// Transaction priority.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) enum Priority {
Expand Down
Loading