Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pending transactions #466

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
430 changes: 168 additions & 262 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions client/rpc-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

## Unreleased
- Add `FilteredParams::address_in_bloom()` and `FilteredParams::topics_in_bloom()` functions to check the possible existance of Filter addresses or topics in a block.
- Removed `PendingTransaction` and `PendingTransactions` types.
5 changes: 1 addition & 4 deletions client/rpc-core/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ pub use self::sync::{
ChainStatus, EthProtocolInfo, PeerCount, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, Peers,
PipProtocolInfo, SyncInfo, SyncStatus, TransactionStats,
};
pub use self::transaction::{
LocalTransactionStatus, PendingTransaction, PendingTransactions, RichRawTransaction,
Transaction,
};
pub use self::transaction::{LocalTransactionStatus, RichRawTransaction, Transaction};
pub use self::transaction_request::TransactionRequest;
pub use self::work::Work;
20 changes: 0 additions & 20 deletions client/rpc-core/src/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ use crate::types::Bytes;
use ethereum_types::{H160, H256, H512, U256, U64};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

/// Transaction
#[derive(Debug, Default, Clone, PartialEq, Serialize)]
Expand Down Expand Up @@ -159,19 +155,3 @@ pub struct RichRawTransaction {
#[serde(rename = "tx")]
pub transaction: Transaction,
}

pub struct PendingTransaction {
pub transaction: Transaction,
pub at_block: u64,
}

impl PendingTransaction {
pub fn new(transaction: Transaction, at_block: u64) -> Self {
Self {
transaction,
at_block,
}
}
}

pub type PendingTransactions = Option<Arc<Mutex<HashMap<H256, PendingTransaction>>>>;
3 changes: 2 additions & 1 deletion client/rpc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
* Use pallet-ethereum 3.0.0-dev.
* `EthFilterApi::new` takes an additional `backend` parameter.
* Bump `fp-storage` to `2.0.0-dev`.
* Bump `fc-db` to `2.0.0-dev`.
* Bump `fc-db` to `2.0.0-dev`.
* Removed on-memory pending transactions in favor of transaction pool.
1 change: 1 addition & 0 deletions client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sp-io = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate"
sp-runtime = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
sp-api = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
sp-transaction-pool = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
sc-transaction-pool = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
sp-storage = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
sp-blockchain = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
sc-transaction-pool-api = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate" }
Expand Down
139 changes: 51 additions & 88 deletions client/rpc/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ use ethereum::{BlockV0 as EthereumBlock, TransactionV0 as EthereumTransaction};
use ethereum_types::{H160, H256, H512, H64, U256, U64};
use fc_rpc_core::types::{
Block, BlockNumber, BlockTransactions, Bytes, CallRequest, Filter, FilterChanges, FilterPool,
FilterPoolItem, FilterType, FilteredParams, Header, Index, Log, PeerCount, PendingTransaction,
PendingTransactions, Receipt, Rich, RichBlock, SyncInfo, SyncStatus, Transaction,
TransactionRequest, Work,
FilterPoolItem, FilterType, FilteredParams, Header, Index, Log, PeerCount, Receipt, Rich,
RichBlock, SyncInfo, SyncStatus, Transaction, TransactionRequest, Work,
};
use fc_rpc_core::{
EthApi as EthApiT, EthFilterApi as EthFilterApiT, NetApi as NetApiT, Web3Api as Web3ApiT,
Expand All @@ -37,6 +36,7 @@ use sc_client_api::{
client::BlockchainEvents,
};
use sc_network::{ExHashT, NetworkService};
use sc_transaction_pool::{ChainApi, Pool};
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
use sha3::{Digest, Keccak256};
use sp_api::{BlockId, Core, HeaderT, ProvideRuntimeApi};
Expand All @@ -45,7 +45,7 @@ use sp_runtime::{
traits::{BlakeTwo256, Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero},
transaction_validity::TransactionSource,
};
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::{
marker::PhantomData,
sync::{Arc, Mutex},
Expand All @@ -57,33 +57,34 @@ use codec::{self, Decode, Encode};
pub use fc_rpc_core::{EthApiServer, EthFilterApiServer, NetApiServer, Web3ApiServer};
use pallet_ethereum::EthereumStorageSchema;

pub struct EthApi<B: BlockT, C, P, CT, BE, H: ExHashT> {
pub struct EthApi<B: BlockT, C, P, CT, BE, H: ExHashT, A: ChainApi> {
pool: Arc<P>,
graph: Arc<Pool<A>>,
client: Arc<C>,
convert_transaction: CT,
network: Arc<NetworkService<B, H>>,
is_authority: bool,
signers: Vec<Box<dyn EthSigner>>,
overrides: Arc<OverrideHandle<B>>,
pending_transactions: PendingTransactions,
backend: Arc<fc_db::Backend<B>>,
max_past_logs: u32,
_marker: PhantomData<(B, BE)>,
}

impl<B: BlockT, C, P, CT, BE, H: ExHashT> EthApi<B, C, P, CT, BE, H>
impl<B: BlockT, C, P, CT, BE, H: ExHashT, A: ChainApi> EthApi<B, C, P, CT, BE, H, A>
where
C: ProvideRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
B: BlockT<Hash = H256> + Send + Sync + 'static,
A: ChainApi<Block = B> + 'static,
C: Send + Sync + 'static,
{
pub fn new(
client: Arc<C>,
pool: Arc<P>,
graph: Arc<Pool<A>>,
convert_transaction: CT,
network: Arc<NetworkService<B, H>>,
pending_transactions: PendingTransactions,
signers: Vec<Box<dyn EthSigner>>,
overrides: Arc<OverrideHandle<B>>,
backend: Arc<fc_db::Backend<B>>,
Expand All @@ -93,12 +94,12 @@ where
Self {
client: client.clone(),
pool,
graph,
convert_transaction,
network,
is_authority,
signers,
overrides,
pending_transactions,
backend,
max_past_logs,
_marker: PhantomData,
Expand Down Expand Up @@ -408,7 +409,7 @@ fn filter_block_logs<'a>(
ret
}

impl<B, C, P, CT, BE, H: ExHashT> EthApiT for EthApi<B, C, P, CT, BE, H>
impl<B, C, P, CT, BE, H: ExHashT, A> EthApiT for EthApi<B, C, P, CT, BE, H, A>
where
C: ProvideRuntimeApi<B> + StorageProvider<B, BE> + AuxStore,
C: HeaderBackend<B> + HeaderMetadata<B, Error = BlockChainError> + 'static,
Expand All @@ -418,6 +419,7 @@ where
B: BlockT<Hash = H256> + Send + Sync + 'static,
C: Send + Sync + 'static,
P: TransactionPool<Block = B> + Send + Sync + 'static,
A: ChainApi<Block = B> + 'static,
CT: ConvertTransaction<<B as BlockT>::Extrinsic> + Send + Sync + 'static,
{
fn protocol_version(&self) -> Result<u64> {
Expand Down Expand Up @@ -796,8 +798,6 @@ where
let transaction_hash =
H256::from_slice(Keccak256::digest(&rlp::encode(&transaction)).as_slice());
let hash = self.client.info().best_hash;
let number = self.client.info().best_number;
let pending = self.pending_transactions.clone();
Box::pin(
self.pool
.submit_one(
Expand All @@ -806,20 +806,7 @@ where
self.convert_transaction
.convert_transaction(transaction.clone()),
)
.map_ok(move |_| {
if let Some(pending) = pending {
if let Ok(locked) = &mut pending.lock() {
locked.insert(
transaction_hash,
PendingTransaction::new(
transaction_build(transaction, None, None),
UniqueSaturatedInto::<u64>::unique_saturated_into(number),
),
);
}
}
transaction_hash
})
.map_ok(move |_| transaction_hash)
.map_err(|err| {
internal_err(format!("submit transaction to pool failed: {:?}", err))
}),
Expand All @@ -834,8 +821,6 @@ where
let transaction_hash =
H256::from_slice(Keccak256::digest(&rlp::encode(&transaction)).as_slice());
let hash = self.client.info().best_hash;
let number = self.client.info().best_number;
let pending = self.pending_transactions.clone();
Box::pin(
self.pool
.submit_one(
Expand All @@ -844,20 +829,7 @@ where
self.convert_transaction
.convert_transaction(transaction.clone()),
)
.map_ok(move |_| {
if let Some(pending) = pending {
if let Ok(locked) = &mut pending.lock() {
locked.insert(
transaction_hash,
PendingTransaction::new(
transaction_build(transaction, None, None),
UniqueSaturatedInto::<u64>::unique_saturated_into(number),
),
);
}
}
transaction_hash
})
.map_ok(move |_| transaction_hash)
.map_err(|err| {
internal_err(format!("submit transaction to pool failed: {:?}", err))
}),
Expand Down Expand Up @@ -1089,6 +1061,42 @@ where
}

fn transaction_by_hash(&self, hash: H256) -> Result<Option<Transaction>> {
let mut xts: Vec<<B as BlockT>::Extrinsic> = Vec::new();
// Collect transactions in the ready validated pool.
xts.extend(
self.graph
.validated_pool()
.ready()
.map(|in_pool_tx| in_pool_tx.data().clone())
.collect::<Vec<<B as BlockT>::Extrinsic>>(),
);

// Collect transactions in the future validated pool.
xts.extend(
self.graph
.validated_pool()
.futures()
.iter()
.map(|(_hash, extrinsic)| extrinsic.clone())
.collect::<Vec<<B as BlockT>::Extrinsic>>(),
);

let best_block: BlockId<B> = BlockId::Hash(self.client.info().best_hash);
let ethereum_transactions: Vec<ethereum::TransactionV0> = self
.client
.runtime_api()
.extrinsic_filter(&best_block, xts)
.map_err(|err| {
internal_err(format!("fetch runtime extrinsic filter failed: {:?}", err))
})?;

for txn in ethereum_transactions {
let inner_hash = H256::from_slice(Keccak256::digest(&rlp::encode(&txn)).as_slice());
if hash == inner_hash {
return Ok(Some(transaction_build(txn, None, None)));
}
}

let (hash, index) = match frontier_backend_client::load_transactions::<B, C>(
self.client.as_ref(),
self.backend.as_ref(),
Expand All @@ -1097,16 +1105,7 @@ where
.map_err(|err| internal_err(format!("{:?}", err)))?
{
Some((hash, index)) => (hash, index as usize),
None => {
if let Some(pending) = &self.pending_transactions {
if let Ok(locked) = &mut pending.lock() {
if let Some(pending_transaction) = locked.get(&hash) {
return Ok(Some(pending_transaction.transaction.clone()));
}
}
}
return Ok(None);
}
None => return Ok(None),
};

let id = match frontier_backend_client::load_hash::<B>(self.backend.as_ref(), hash)
Expand Down Expand Up @@ -1866,42 +1865,6 @@ where
}
}

pub async fn pending_transaction_task(
client: Arc<C>,
pending_transactions: Arc<Mutex<HashMap<H256, PendingTransaction>>>,
retain_threshold: u64,
) {
let mut notification_st = client.import_notification_stream();

while let Some(notification) = notification_st.next().await {
if notification.is_new_best {
if let Ok(mut pending_transactions) = pending_transactions.lock() {
// As pending transactions have a finite lifespan anyway
// we can ignore MultiplePostRuntimeLogs error checks.
let log = fp_consensus::find_log(&notification.header.digest()).ok();
let post_hashes = log.map(|log| log.into_hashes());

if let Some(post_hashes) = post_hashes {
// Retain all pending transactions that were not
// processed in the current block.
pending_transactions
.retain(|&k, _| !post_hashes.transaction_hashes.contains(&k));
}

let imported_number: u64 = UniqueSaturatedInto::<u64>::unique_saturated_into(
*notification.header.number(),
);

pending_transactions.retain(|_, v| {
// Drop all the transactions that exceeded the given lifespan.
let lifespan_limit = v.at_block + retain_threshold;
lifespan_limit > imported_number
});
}
}
}
}

pub async fn filter_pool_task(
client: Arc<C>,
filter_pool: Arc<Mutex<BTreeMap<U256, FilterPoolItem>>>,
Expand Down
2 changes: 1 addition & 1 deletion frame/dynamic-fee/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ mod tests {
frame_system::limits::BlockWeights::simple_max(1024);
}
impl frame_system::Config for Test {
type BaseCallFilter = ();
type BaseCallFilter = frame_support::traits::Everything;
type BlockWeights = ();
type BlockLength = ();
type DbWeight = ();
Expand Down
2 changes: 1 addition & 1 deletion frame/ethereum/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ parameter_types! {
}

impl frame_system::Config for Test {
type BaseCallFilter = ();
type BaseCallFilter = frame_support::traits::Everything;
type BlockWeights = ();
type BlockLength = ();
type DbWeight = ();
Expand Down
2 changes: 1 addition & 1 deletion frame/evm/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ parameter_types! {
frame_system::limits::BlockWeights::simple_max(1024);
}
impl frame_system::Config for Test {
type BaseCallFilter = ();
type BaseCallFilter = frame_support::traits::Everything;
type BlockWeights = ();
type BlockLength = ();
type DbWeight = ();
Expand Down
2 changes: 1 addition & 1 deletion template/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub fn run() -> sc_cli::Result<()> {
if cfg!(feature = "runtime-benchmarks") {
let runner = cli.create_runner(cmd)?;

runner.sync_run(|config| cmd.run::<Block, service::Executor>(config))
runner.sync_run(|config| cmd.run::<Block, service::ExecutorDispatch>(config))
} else {
Err(
"Benchmarking wasn't enabled when building the node. You can enable it with `--features runtime-benchmarks`."
Expand Down
Loading