diff --git a/Cargo.lock b/Cargo.lock index 9334d01028d..60565f17585 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -694,7 +694,7 @@ dependencies = [ "rlp 0.2.1", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", - "transaction-pool 1.12.0", + "transaction-pool 1.12.1", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2241,7 +2241,7 @@ dependencies = [ "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.0", + "transaction-pool 1.12.1", "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "vm 0.1.0", ] @@ -3422,7 +3422,7 @@ dependencies = [ [[package]] name = "transaction-pool" -version = "1.12.0" +version = "1.12.1" dependencies = [ "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 39f53445ada..179f7090036 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); +/// Max number of transactions in a single packet. +const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; + // minimum interval between updates. const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); @@ -647,7 +650,7 @@ impl LightProtocol { fn propagate_transactions(&self, io: &IoContext) { if self.capabilities.read().tx_relay { return } - let ready_transactions = self.provider.ready_transactions(); + let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE); if ready_transactions.is_empty() { return } trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len()); diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 305ef9b3540..e182f38b8a6 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -173,8 +173,8 @@ impl Provider for TestProvider { }) } - fn ready_transactions(&self) -> Vec { - self.0.client.ready_transactions() + fn ready_transactions(&self, max_len: usize) -> Vec { + self.0.client.ready_transactions(max_len) } } diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 0e518ea7723..e75915e8cf8 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -125,7 +125,7 @@ pub trait Provider: Send + Sync { fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option; /// Provide pending transactions. - fn ready_transactions(&self) -> Vec; + fn ready_transactions(&self, max_len: usize) -> Vec; /// Provide a proof-of-execution for the given transaction proof request. /// Returns a vector of all state items necessary to execute the transaction. @@ -280,8 +280,8 @@ impl Provider for T { .map(|(_, proof)| ::request::ExecutionResponse { items: proof }) } - fn ready_transactions(&self) -> Vec { - BlockChainClient::ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec { + BlockChainClient::ready_transactions(self, max_len) .into_iter() .map(|tx| tx.pending().clone()) .collect() @@ -367,9 +367,12 @@ impl Provider for LightProvider { None } - fn ready_transactions(&self) -> Vec { + fn ready_transactions(&self, max_len: usize) -> Vec { let chain_info = self.chain_info(); - self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + let mut transactions = self.txqueue.read() + .ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp); + transactions.truncate(max_len); + transactions } } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 2d0201067f1..d47d1afa49b 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1956,8 +1956,8 @@ impl BlockChainClient for Client { (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() } - fn ready_transactions(&self) -> Vec> { - self.importer.miner.ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec> { + self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority) } fn signing_chain_id(&self) -> Option { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 4bd76b4495c..620f1af3330 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry; use receipt::{Receipt, LocalizedReceipt, TransactionOutcome}; use error::ImportResult; use vm::Schedule; -use miner::{Miner, MinerService}; +use miner::{self, Miner, MinerService}; use spec::Spec; use types::basic_account::BasicAccount; use types::pruning_info::PruningInfo; @@ -806,8 +806,8 @@ impl BlockChainClient for TestBlockChainClient { self.traces.read().clone() } - fn ready_transactions(&self) -> Vec> { - self.miner.ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec> { + self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority) } fn signing_chain_id(&self) -> Option { None } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index d3a20dcff89..29876fe80a4 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra fn last_hashes(&self) -> LastHashes; /// List all transactions that are allowed into the next block. - fn ready_transactions(&self) -> Vec>; + fn ready_transactions(&self, max_len: usize) -> Vec>; /// Sorted list of transaction gas prices from at least last sample_size blocks. fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus { diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index bf51d0b135b..27c94dd8b4b 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -364,18 +364,28 @@ impl Miner { let client = self.pool_client(chain); let engine_params = self.engine.params(); - let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); + let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); let nonce_cap: Option = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition { Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into()) } else { None }; + // we will never need more transactions than limit divided by min gas + let max_transactions = if min_tx_gas.is_zero() { + usize::max_value() + } else { + (*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize + }; let pending: Vec> = self.transaction_queue.pending( client.clone(), - chain_info.best_block_number, - chain_info.best_block_timestamp, - nonce_cap, + pool::PendingSettings { + block_number: chain_info.best_block_number, + current_timestamp: chain_info.best_block_timestamp, + nonce_cap, + max_len: max_transactions, + ordering: miner::PendingOrdering::Priority, + } ); let took_ms = |elapsed: &Duration| { @@ -805,20 +815,28 @@ impl miner::MinerService for Miner { self.transaction_queue.all_transactions() } - fn ready_transactions(&self, chain: &C) -> Vec> where + fn ready_transactions(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering) + -> Vec> + where C: ChainInfo + Nonce + Sync, { let chain_info = chain.chain_info(); let from_queue = || { + // We propagate transactions over the nonce cap. + // The mechanism is only to limit number of transactions in pending block + // those transactions are valid and will just be ready to be included in next block. + let nonce_cap = None; + self.transaction_queue.pending( CachedNonceClient::new(chain, &self.nonce_cache), - chain_info.best_block_number, - chain_info.best_block_timestamp, - // We propagate transactions over the nonce cap. - // The mechanism is only to limit number of transactions in pending block - // those transactions are valid and will just be ready to be included in next block. - None, + pool::PendingSettings { + block_number: chain_info.best_block_number, + current_timestamp: chain_info.best_block_timestamp, + nonce_cap, + max_len, + ordering, + }, ) }; @@ -828,6 +846,7 @@ impl miner::MinerService for Miner { .iter() .map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone())) .map(Arc::new) + .take(max_len) .collect() }, chain_info.best_block_number) }; @@ -1081,7 +1100,7 @@ mod tests { use rustc_hex::FromHex; use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock}; - use miner::MinerService; + use miner::{MinerService, PendingOrdering}; use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts}; use transaction::{Transaction}; @@ -1177,7 +1196,7 @@ mod tests { assert_eq!(res.unwrap(), ()); assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1); assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1); - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); // This method will let us know if pending block was created (before calling that method) assert!(!miner.prepare_pending_block(&client)); } @@ -1196,7 +1215,7 @@ mod tests { assert_eq!(res.unwrap(), ()); assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_receipts(best_block), None); - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] @@ -1215,11 +1234,11 @@ mod tests { assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_receipts(best_block), None); // By default we use PendingSet::AlwaysSealing, so no transactions yet. - assert_eq!(miner.ready_transactions(&client).len(), 0); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0); // This method will let us know if pending block was created (before calling that method) assert!(miner.prepare_pending_block(&client)); // After pending block is created we should see a transaction. - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index dd5f28feb60..631941de611 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -26,6 +26,7 @@ pub mod pool_client; pub mod stratum; pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams}; +pub use ethcore_miner::pool::PendingOrdering; use std::sync::Arc; use std::collections::BTreeMap; @@ -156,10 +157,12 @@ pub trait MinerService : Send + Sync { fn next_nonce(&self, chain: &C, address: &Address) -> U256 where C: Nonce + Sync; - /// Get a list of all ready transactions. + /// Get a list of all ready transactions either ordered by priority or unordered (cheaper). /// /// Depending on the settings may look in transaction pool or only in pending block. - fn ready_transactions(&self, chain: &C) -> Vec> + /// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of + /// transactions. + fn ready_transactions(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec> where C: ChainInfo + Nonce + Sync; /// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet). diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index ccafcf6613c..e18b4db9832 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -30,7 +30,7 @@ use test_helpers::{ use types::filter::Filter; use ethereum_types::{U256, Address}; use kvdb_rocksdb::{Database, DatabaseConfig}; -use miner::Miner; +use miner::{Miner, PendingOrdering}; use spec::Spec; use views::BlockView; use ethkey::KeyPair; @@ -343,12 +343,12 @@ fn does_not_propagate_delayed_transactions() { client.miner().import_own_transaction(&*client, tx0).unwrap(); client.miner().import_own_transaction(&*client, tx1).unwrap(); - assert_eq!(0, client.ready_transactions().len()); - assert_eq!(0, client.miner().ready_transactions(&*client).len()); + assert_eq!(0, client.ready_transactions(10).len()); + assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); push_blocks_to_client(&client, 53, 2, 2); client.flush_queue(); - assert_eq!(2, client.ready_transactions().len()); - assert_eq!(2, client.miner().ready_transactions(&*client).len()); + assert_eq!(2, client.ready_transactions(10).len()); + assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); } #[test] diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index b759fb734a7..56bc579ad88 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -359,6 +359,10 @@ impl SyncProvider for EthSync { } } +const PEERS_TIMER: TimerToken = 0; +const SYNC_TIMER: TimerToken = 1; +const TX_TIMER: TimerToken = 2; + struct SyncProtocolHandler { /// Shared blockchain client. chain: Arc, @@ -373,7 +377,9 @@ struct SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { - io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer"); + io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); + io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); } } @@ -399,12 +405,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } } - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { trace_time!("sync::timeout"); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); - self.sync.write().maintain_peers(&mut io); - self.sync.write().maintain_sync(&mut io); - self.sync.write().propagate_new_transactions(&mut io); + match timer { + PEERS_TIMER => self.sync.write().maintain_peers(&mut io), + SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + TX_TIMER => { + self.sync.write().propagate_new_transactions(&mut io); + }, + _ => warn!("Unknown timer {} triggered.", timer), + } } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 8f0aff7514b..84e6344e688 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64; const MAX_NEW_BLOCK_AGE: BlockNumber = 20; // maximal packet size with transactions (cannot be greater than 16MB - protocol limitation). const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; +// Maximal number of transactions queried from miner to propagate. +// This set is used to diff with transactions known by the peer and +// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`. +const MAX_TRANSACTIONS_TO_QUERY: usize = 4096; // Maximal number of transactions in sent in single packet. const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; // Min number of blocks to be behind for a snapshot sync @@ -1143,7 +1147,7 @@ pub mod tests { use super::{PeerInfo, PeerAsking}; use ethcore::header::*; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo}; - use ethcore::miner::MinerService; + use ethcore::miner::{MinerService, PendingOrdering}; use private_tx::NoopPrivateTxHandler; pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { @@ -1355,7 +1359,7 @@ pub mod tests { let mut io = TestIo::new(&mut client, &ss, &queue, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); - assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1); + assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1); } // We need to update nonce status (because we say that the block has been imported) for h in &[good_blocks[0]] { @@ -1371,7 +1375,7 @@ pub mod tests { } // then - assert_eq!(client.miner.ready_transactions(&client).len(), 1); + assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 4ae0518a537..75cf550f28b 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -33,6 +33,7 @@ use super::{ MAX_PEERS_PROPAGATION, MAX_TRANSACTION_PACKET_SIZE, MAX_TRANSACTIONS_TO_PROPAGATE, + MAX_TRANSACTIONS_TO_QUERY, MIN_PEERS_PROPAGATION, CONSENSUS_DATA_PACKET, NEW_BLOCK_HASHES_PACKET, @@ -114,7 +115,7 @@ impl SyncPropagator { return 0; } - let transactions = io.chain().ready_transactions(); + let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY); if transactions.is_empty() { return 0; } diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 9dc180fefe4..6b61df4de4e 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -30,13 +30,14 @@ extern crate linked_hash_map; extern crate parking_lot; extern crate price_info; extern crate rlp; -extern crate trace_time; extern crate transaction_pool as txpool; #[macro_use] extern crate error_chain; #[macro_use] extern crate log; +#[macro_use] +extern crate trace_time; #[cfg(test)] extern crate rustc_hex; diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 57f813157bd..fd4ef6ef2e8 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -16,7 +16,7 @@ //! Transaction Pool -use ethereum_types::{H256, Address}; +use ethereum_types::{U256, H256, Address}; use heapsize::HeapSizeOf; use transaction; use txpool; @@ -45,6 +45,43 @@ pub enum PrioritizationStrategy { GasPriceOnly, } +/// Transaction ordering when requesting pending set. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PendingOrdering { + /// Get pending transactions ordered by their priority (potentially expensive) + Priority, + /// Get pending transactions without any care of particular ordering (cheaper). + Unordered, +} + +/// Pending set query settings +#[derive(Debug, Clone)] +pub struct PendingSettings { + /// Current block number (affects readiness of some transactions). + pub block_number: u64, + /// Current timestamp (affects readiness of some transactions). + pub current_timestamp: u64, + /// Nonce cap (for dust protection; EIP-168) + pub nonce_cap: Option, + /// Maximal number of transactions in pending the set. + pub max_len: usize, + /// Ordering of transactions. + pub ordering: PendingOrdering, +} + +impl PendingSettings { + /// Get all transactions (no cap or len limit) prioritized. + pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self { + PendingSettings { + block_number, + current_timestamp, + nonce_cap: None, + max_len: usize::max_value(), + ordering: PendingOrdering::Priority, + } + } +} + /// Transaction priority. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum Priority { diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index cf4a956f426..4351087b29b 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -26,7 +26,10 @@ use parking_lot::RwLock; use transaction; use txpool::{self, Verifier}; -use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy}; +use pool::{ + self, scoring, verifier, client, ready, listener, + PrioritizationStrategy, PendingOrdering, PendingSettings, +}; use pool::local_transactions::LocalTransactionsList; type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); @@ -74,6 +77,7 @@ struct CachedPending { nonce_cap: Option, has_local_pending: bool, pending: Option>>, + max_len: usize, } impl CachedPending { @@ -85,6 +89,7 @@ impl CachedPending { has_local_pending: false, pending: None, nonce_cap: None, + max_len: 0, } } @@ -99,6 +104,7 @@ impl CachedPending { block_number: u64, current_timestamp: u64, nonce_cap: Option<&U256>, + max_len: usize, ) -> Option>> { // First check if we have anything in cache. let pending = self.pending.as_ref()?; @@ -123,7 +129,12 @@ impl CachedPending { return None; } - Some(pending.clone()) + // It's fine to just take a smaller subset, but not other way around. + if max_len > self.max_len { + return None; + } + + Some(pending.iter().take(max_len).cloned().collect()) } } @@ -173,7 +184,7 @@ impl TransactionQueue { transactions: Vec, ) -> Vec> { // Run verification - let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import"); + trace_time!("pool::verify_and_import"); let options = self.options.read().clone(); let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone()); @@ -203,13 +214,13 @@ impl TransactionQueue { results } - /// Returns all transactions in the queue ordered by priority. + /// Returns all transactions in the queue without explicit ordering. pub fn all_transactions(&self) -> Vec> { let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready; - self.pool.read().pending(ready).collect() + self.pool.read().unordered_pending(ready).collect() } - /// Returns current pneding transactions. + /// Returns current pending transactions ordered by priority. /// /// NOTE: This may return a cached version of pending transaction set. /// Re-computing the pending set is possible with `#collect_pending` method, @@ -217,24 +228,31 @@ impl TransactionQueue { pub fn pending( &self, client: C, - block_number: u64, - current_timestamp: u64, - nonce_cap: Option, + settings: PendingSettings, ) -> Vec> where C: client::NonceClient, { - - if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) { + let PendingSettings { block_number, current_timestamp, nonce_cap, max_len, ordering } = settings; + if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } // Double check after acquiring write lock let mut cached_pending = self.cached_pending.write(); - if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) { + if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } - let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect()); + // In case we don't have a cached set, but we don't care about order + // just return the unordered set. + if let PendingOrdering::Unordered = ordering { + let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); + return self.pool.read().unordered_pending(ready).take(max_len).collect(); + } + + let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| { + i.take(max_len).collect() + }); *cached_pending = CachedPending { block_number, @@ -242,6 +260,7 @@ impl TransactionQueue { nonce_cap, has_local_pending: self.has_local_pending_transactions(), pending: Some(pending.clone()), + max_len, }; pending @@ -266,15 +285,27 @@ impl TransactionQueue { scoring::NonceAndGasPrice, Listener, >) -> T, + { + debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number); + trace_time!("pool::collect_pending"); + let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); + collect(self.pool.read().pending(ready)) + } + + fn ready( + client: C, + block_number: u64, + current_timestamp: u64, + nonce_cap: Option, + ) -> (ready::Condition, ready::State) where + C: client::NonceClient, { let pending_readiness = ready::Condition::new(block_number, current_timestamp); // don't mark any transactions as stale at this point. let stale_id = None; let state_readiness = ready::State::new(client, stale_id, nonce_cap); - let ready = (pending_readiness, state_readiness); - - collect(self.pool.read().pending(ready)) + (pending_readiness, state_readiness) } /// Culls all stalled transactions from the pool. @@ -415,6 +446,12 @@ impl TransactionQueue { let mut pool = self.pool.write(); (pool.listener_mut().1).0.add(f); } + + /// Check if pending set is cached. + #[cfg(test)] + pub fn is_pending_cached(&self) -> bool { + self.cached_pending.read().pending.is_some() + } } fn convert_error(err: txpool::Error) -> transaction::Error { @@ -440,7 +477,7 @@ mod tests { fn should_get_pending_transactions() { let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly); - let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None); + let pending: Vec<_> = queue.pending(TestClient::default(), PendingSettings::all_prioritized(0, 0)); for tx in pending { assert!(tx.signed().nonce > 0.into()); diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index ac2e6b008e9..ef83db4a901 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -18,7 +18,7 @@ use ethereum_types::U256; use transaction::{self, PendingTransaction}; use txpool; -use pool::{verifier, TransactionQueue, PrioritizationStrategy}; +use pool::{verifier, TransactionQueue, PrioritizationStrategy, PendingSettings, PendingOrdering}; pub mod tx; pub mod client; @@ -108,7 +108,7 @@ fn should_handle_same_transaction_imported_twice_with_different_state_nonces() { // and then there should be only one transaction in current (the one with higher gas_price) assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 1); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); } @@ -133,7 +133,7 @@ fn should_move_all_transactions_from_future() { // then assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); } @@ -207,7 +207,7 @@ fn should_import_txs_from_same_sender() { txq.import(TestClient::new(), txs.local().into_vec()); // then - let top = txq.pending(TestClient::new(), 0 ,0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0 ,0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -229,7 +229,7 @@ fn should_prioritize_local_transactions_within_same_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(client, 0, 0, None); + let top = txq.pending(client, PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); // local should be first assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -251,7 +251,7 @@ fn should_prioritize_reimported_transactions_within_same_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); // retracted should be first assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -270,7 +270,7 @@ fn should_not_prioritize_local_transactions_with_different_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -288,7 +288,7 @@ fn should_put_transaction_to_futures_if_gap_detected() { // then assert_eq!(res, vec![Ok(()), Ok(())]); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 1); assert_eq!(top[0].hash, hash); } @@ -308,9 +308,9 @@ fn should_handle_min_block() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 0); - let top = txq.pending(TestClient::new(), 1, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(1, 0)); assert_eq!(top.len(), 2); } @@ -341,7 +341,7 @@ fn should_move_transactions_if_gap_filled() { let res = txq.import(TestClient::new(), vec![tx, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); // when let res = txq.import(TestClient::new(), vec![tx1.local()]); @@ -349,7 +349,7 @@ fn should_move_transactions_if_gap_filled() { // then assert_eq!(txq.status().status.transaction_count, 3); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3); } #[test] @@ -361,12 +361,12 @@ fn should_remove_transaction() { let res = txq.import(TestClient::default(), vec![tx, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); // when txq.cull(TestClient::new().with_nonce(124)); assert_eq!(txq.status().status.transaction_count, 1); - assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new().with_nonce(125), PendingSettings::all_prioritized(0, 0)).len(), 1); txq.cull(TestClient::new().with_nonce(126)); // then @@ -384,19 +384,19 @@ fn should_move_transactions_to_future_if_gap_introduced() { let res = txq.import(TestClient::new(), vec![tx3, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); let res = txq.import(TestClient::new(), vec![tx].local()); assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 3); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3); // when txq.remove(vec![&hash], true); // then assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -447,7 +447,7 @@ fn should_prefer_current_transactions_when_hitting_the_limit() { assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 1); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 1); assert_eq!(top[0].hash, hash); assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into())); @@ -494,19 +494,19 @@ fn should_accept_same_transaction_twice_if_removed() { let res = txq.import(TestClient::new(), txs.local().into_vec()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2); // when txq.remove(vec![&hash], true); assert_eq!(txq.status().status.transaction_count, 1); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 0); let res = txq.import(TestClient::new(), vec![tx1].local()); assert_eq!(res, vec![Ok(())]); // then assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2); } #[test] @@ -526,8 +526,8 @@ fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() { // then assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20)); - assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2)); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[0].signed().gas_price, U256::from(20)); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[1].signed().gas_price, U256::from(2)); } #[test] @@ -569,7 +569,7 @@ fn should_return_valid_last_nonce_after_cull() { let client = TestClient::new().with_nonce(124); txq.cull(client.clone()); // tx2 should be not be promoted to current - assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0)).len(), 0); // then assert_eq!(txq.next_nonce(client.clone(), &sender), None); @@ -667,7 +667,7 @@ fn should_accept_local_transactions_below_min_gas_price() { assert_eq!(res, vec![Ok(())]); // then - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -685,7 +685,7 @@ fn should_accept_local_service_transaction() { assert_eq!(res, vec![Ok(())]); // then - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -726,15 +726,77 @@ fn should_not_return_transactions_over_nonce_cap() { assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]); // when - let all = txq.pending(TestClient::new(), 0, 0, None); + let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); // This should invalidate the cache! - let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into())); + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: Some(123.into()), + max_len: usize::max_value(), + ordering: PendingOrdering::Priority, + }); // then assert_eq!(all.len(), 3); assert_eq!(limited.len(), 1); } +#[test] +fn should_return_cached_pending_even_if_unordered_is_requested() { + // given + let txq = new_queue(); + let tx1 = Tx::default().signed(); + let (tx2_1, tx2_2)= Tx::default().signed_pair(); + let tx2_1_hash = tx2_1.hash(); + let res = txq.import(TestClient::new(), vec![tx1].unverified()); + assert_eq!(res, vec![Ok(())]); + let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local()); + assert_eq!(res, vec![Ok(()), Ok(())]); + + // when + let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); + assert_eq!(all[0].hash, tx2_1_hash); + assert_eq!(all.len(), 3); + + // This should not invalidate the cache! + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: None, + max_len: 3, + ordering: PendingOrdering::Unordered, + }); + + // then + assert_eq!(all, limited); +} + +#[test] +fn should_return_unordered_and_not_populate_the_cache() { + // given + let txq = new_queue(); + let tx1 = Tx::default().signed(); + let (tx2_1, tx2_2)= Tx::default().signed_pair(); + let res = txq.import(TestClient::new(), vec![tx1].unverified()); + assert_eq!(res, vec![Ok(())]); + let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local()); + assert_eq!(res, vec![Ok(()), Ok(())]); + + // when + // This should not invalidate the cache! + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: None, + max_len: usize::max_value(), + ordering: PendingOrdering::Unordered, + }); + + // then + assert_eq!(limited.len(), 3); + assert!(!txq.is_pending_cached()); +} + #[test] fn should_clear_cache_after_timeout_for_local() { // given @@ -748,12 +810,12 @@ fn should_clear_cache_after_timeout_for_local() { // This should populate cache and set timestamp to 1 // when - assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0); - assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1)).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1000)).len(), 0); // This should invalidate the cache and trigger transaction ready. // then - assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1002)).len(), 2); } #[test] diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index ce30f3cd85e..31af69eaa06 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -304,7 +304,7 @@ impl FullDependencies { let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); let h = client.handler(); self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() { - h.new_transactions(hashes); + h.notify_new_transactions(hashes); })); if let Some(h) = client.handler().upgrade() { @@ -525,7 +525,7 @@ impl LightDependencies { let h = client.handler(); self.transaction_queue.write().add_listener(Box::new(move |transactions| { if let Some(h) = h.upgrade() { - h.new_transactions(transactions); + h.notify_new_transactions(transactions); } })); handler.extend_with(EthPubSub::to_delegate(client)); diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 8a0b689c655..5c36171dece 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -36,7 +36,7 @@ jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = " jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" } ethash = { path = "../ethash" } -ethcore = { path = "../ethcore" } +ethcore = { path = "../ethcore", features = ["test-helpers"] } ethcore-bytes = { path = "../util/bytes" } ethcore-crypto = { path = "../ethcore/crypto" } ethcore-devtools = { path = "../devtools" } diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index bbad2fe27d0..c91070cbfb6 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -85,7 +85,7 @@ impl Filterable for EthFilterClient where } fn pending_transactions_hashes(&self) -> Vec { - self.miner.ready_transactions(&*self.client) + self.miner.ready_transactions(&*self.client, usize::max_value(), miner::PendingOrdering::Priority) .into_iter() .map(|tx| tx.signed().hash()) .collect() diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 11fef2e0bd0..38162e8ea1b 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -175,7 +175,7 @@ impl ChainNotificationHandler { } /// Notify all subscribers about new transaction hashes. - pub fn new_transactions(&self, hashes: &[H256]) { + pub fn notify_new_transactions(&self, hashes: &[H256]) { for subscriber in self.transactions_subscribers.read().values() { for hash in hashes { Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into())); diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs index 91db00ca309..6e93132b922 100644 --- a/rpc/src/v1/impls/light/parity.rs +++ b/rpc/src/v1/impls/light/parity.rs @@ -264,12 +264,13 @@ impl Parity for ParityClient { .map(Into::into) } - fn pending_transactions(&self) -> Result> { + fn pending_transactions(&self, limit: Trailing) -> Result> { let txq = self.light_dispatch.transaction_queue.read(); let chain_info = self.light_dispatch.client.chain_info(); Ok( txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) .into_iter() + .take(limit.unwrap_or_else(usize::max_value)) .map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition)) .collect::>() ) diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index f18723eafe1..d7c26014edd 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -303,15 +303,19 @@ impl Parity for ParityClient where .map(Into::into) } - fn pending_transactions(&self) -> Result> { + fn pending_transactions(&self, limit: Trailing) -> Result> { let block_number = self.client.chain_info().best_block_number; - let ready_transactions = self.miner.ready_transactions(&*self.client); + let ready_transactions = self.miner.ready_transactions( + &*self.client, + limit.unwrap_or_else(usize::max_value), + miner::PendingOrdering::Priority, + ); Ok(ready_transactions .into_iter() .map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition)) .collect() - ) + ) } fn all_transactions(&self) -> Result> { diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 90201e346a5..8d0ec23ae1c 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -27,7 +27,7 @@ use ethcore::engines::EthEngine; use ethcore::error::Error; use ethcore::header::{BlockNumber, Header}; use ethcore::ids::BlockId; -use ethcore::miner::{MinerService, AuthoringParams}; +use ethcore::miner::{self, MinerService, AuthoringParams}; use ethcore::receipt::{Receipt, RichReceipt}; use ethereum_types::{H256, U256, Address}; use miner::pool::local_transactions::Status as LocalTransactionStatus; @@ -208,7 +208,7 @@ impl MinerService for TestMinerService { self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect() } - fn ready_transactions(&self, _chain: &C) -> Vec> { + fn ready_transactions(&self, _chain: &C, _max_len: usize, _ordering: miner::PendingOrdering) -> Vec> { self.queued_transactions() } diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index 0d886fe2f1f..30c99fc67ac 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -181,7 +181,7 @@ fn should_subscribe_to_pending_transactions() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Send new transactions - handler.new_transactions(&[5.into(), 7.into()]); + handler.notify_new_transactions(&[5.into(), 7.into()]); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#; diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index f78cf8052eb..1b9a7d09f5b 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -141,7 +141,7 @@ build_rpc_trait! { /// Returns all pending transactions from transaction queue. #[rpc(name = "parity_pendingTransactions")] - fn pending_transactions(&self) -> Result>; + fn pending_transactions(&self, Trailing) -> Result>; /// Returns all transactions from transaction queue. /// diff --git a/transaction-pool/Cargo.toml b/transaction-pool/Cargo.toml index 8965c8cee01..0ba1790a47f 100644 --- a/transaction-pool/Cargo.toml +++ b/transaction-pool/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "Generic transaction pool." name = "transaction-pool" -version = "1.12.0" +version = "1.12.1" license = "GPL-3.0" authors = ["Parity Technologies "] diff --git a/transaction-pool/src/pool.rs b/transaction-pool/src/pool.rs index dcd52a3e7e6..4bbf00ef240 100644 --- a/transaction-pool/src/pool.rs +++ b/transaction-pool/src/pool.rs @@ -15,7 +15,8 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::collections::{HashMap, BTreeSet}; +use std::slice; +use std::collections::{hash_map, HashMap, BTreeSet}; use error; use listener::{Listener, NoopListener}; @@ -416,7 +417,16 @@ impl Pool where PendingIterator { ready, best_transactions, - pool: self + pool: self, + } + } + + /// Returns unprioritized list of ready transactions. + pub fn unordered_pending>(&self, ready: R) -> UnorderedIterator { + UnorderedIterator { + ready, + senders: self.transactions.iter(), + transactions: None, } } @@ -482,6 +492,50 @@ impl Pool where } } +/// An iterator over all pending (ready) transactions in unoredered fashion. +/// +/// NOTE: Current implementation will iterate over all transactions from particular sender +/// ordered by nonce, but that might change in the future. +/// +/// NOTE: the transactions are not removed from the queue. +/// You might remove them later by calling `cull`. +pub struct UnorderedIterator<'a, T, R, S> where + T: VerifiedTransaction + 'a, + S: Scoring + 'a, +{ + ready: R, + senders: hash_map::Iter<'a, T::Sender, Transactions>, + transactions: Option>>, +} + +impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> where + T: VerifiedTransaction, + R: Ready, + S: Scoring, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + loop { + if let Some(transactions) = self.transactions.as_mut() { + if let Some(tx) = transactions.next() { + match self.ready.is_ready(&tx) { + Readiness::Ready => { + return Some(tx.transaction.clone()); + }, + state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state), + } + } + } + + // otherwise fallback and try next sender + let next_sender = self.senders.next()?; + self.transactions = Some(next_sender.1.iter()); + } + } +} + + /// An iterator over all pending (ready) transactions. /// NOTE: the transactions are not removed from the queue. /// You might remove them later by calling `cull`. diff --git a/transaction-pool/src/tests/mod.rs b/transaction-pool/src/tests/mod.rs index 6edd60e60e2..77c25287570 100644 --- a/transaction-pool/src/tests/mod.rs +++ b/transaction-pool/src/tests/mod.rs @@ -250,6 +250,66 @@ fn should_construct_pending() { assert_eq!(pending.next(), None); } +#[test] +fn should_return_unordered_iterator() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap(); + let tx2 = txq.import(b.tx().nonce(2).new()).unwrap(); + let tx3 = txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + txq.import(b.tx().nonce(5).new()).unwrap(); + + let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + // gap + txq.import(b.tx().sender(1).nonce(5).new()).unwrap(); + + let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!(txq.status(NonceReady::default()), Status { + stalled: 0, + pending: 9, + future: 2, + }); + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 3, + pending: 6, + future: 2, + }); + + // when + let all: Vec<_> = txq.unordered_pending(NonceReady::default()).collect(); + + let chain1 = vec![tx0, tx1, tx2, tx3]; + let chain2 = vec![tx5, tx6, tx7, tx8]; + let chain3 = vec![tx9]; + + assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len()); + + let mut options = vec![ + vec![chain1.clone(), chain2.clone(), chain3.clone()], + vec![chain2.clone(), chain1.clone(), chain3.clone()], + vec![chain2.clone(), chain3.clone(), chain1.clone()], + vec![chain3.clone(), chain2.clone(), chain1.clone()], + vec![chain3.clone(), chain1.clone(), chain2.clone()], + vec![chain1.clone(), chain3.clone(), chain2.clone()], + ].into_iter().map(|mut v| { + let mut first = v.pop().unwrap(); + for mut x in v { + first.append(&mut x); + } + first + }); + + assert!(options.any(|opt| all == opt)); +} + #[test] fn should_update_scoring_correctly() { // given