From bce5a67a58e22460f8ce0deb1e9fb0f0b001977d Mon Sep 17 00:00:00 2001 From: perekopskiy Date: Wed, 16 Oct 2024 17:16:51 +0300 Subject: [PATCH 1/4] improve mempool --- core/lib/mempool/src/mempool_store.rs | 51 ++++++++++++++++---- core/lib/mempool/src/tests.rs | 53 +++++++++++++++------ core/node/state_keeper/src/mempool_actor.rs | 30 +++++++++--- 3 files changed, 104 insertions(+), 30 deletions(-) diff --git a/core/lib/mempool/src/mempool_store.rs b/core/lib/mempool/src/mempool_store.rs index 334a4783a76c..47c6395cd7ab 100644 --- a/core/lib/mempool/src/mempool_store.rs +++ b/core/lib/mempool/src/mempool_store.rs @@ -1,4 +1,4 @@ -use std::collections::{hash_map, BTreeSet, HashMap, HashSet}; +use std::collections::{hash_map, BTreeSet, HashMap}; use zksync_types::{ l1::L1Tx, l2::L2Tx, Address, ExecuteTransactionCommon, Nonce, PriorityOpId, Transaction, @@ -221,22 +221,53 @@ impl MempoolStore { } fn gc(&mut self) -> Vec
{ - if self.size >= self.capacity { - let index: HashSet<_> = self + if self.size > self.capacity { + let mut transactions = std::mem::take(&mut self.l2_transactions_per_account); + let mut possibly_kept: Vec<_> = self .l2_priority_queue .iter() - .map(|pointer| pointer.account) + .rev() + .filter_map(|pointer| { + transactions + .remove(&pointer.account) + .map(|txs| (pointer.account, txs)) + }) .collect(); - let transactions = std::mem::take(&mut self.l2_transactions_per_account); - let (kept, drained) = transactions + let mut number_of_accounts_kept = possibly_kept + .iter() + .scan(0, |sum, (_, txs)| { + *sum += txs.len(); + (*sum <= self.capacity as usize).then_some(()) + }) + .count(); + if number_of_accounts_kept == 0 { + tracing::warn!("mempool capacity is too low to handle txs from single account, consider increasing capacity"); + // Keep at least one entry, otherwise mempool won't return any new L2 tx to process. + number_of_accounts_kept = 1; + } + let (kept, drained) = { + let mut drained: Vec<_> = transactions.into_keys().collect(); + let also_drained = possibly_kept + .split_off(number_of_accounts_kept) + .into_iter() + .map(|(address, _)| address); + drained.extend(also_drained); + + (possibly_kept, drained) + }; + + let l2_priority_queue = std::mem::take(&mut self.l2_priority_queue); + self.l2_priority_queue = l2_priority_queue .into_iter() - .partition(|(address, _)| index.contains(address)); - self.l2_transactions_per_account = kept; + .rev() + .take(number_of_accounts_kept) + .collect(); + self.l2_transactions_per_account = kept.into_iter().collect(); self.size = self .l2_transactions_per_account .iter() - .fold(0, |agg, (_, tnxs)| agg + tnxs.len() as u64); - return drained.into_keys().collect(); + .fold(0, |agg, (_, txs)| agg + txs.len() as u64); + return drained; } vec![] } diff --git a/core/lib/mempool/src/tests.rs b/core/lib/mempool/src/tests.rs index 96ef600984f9..ed107fc409c1 100644 --- a/core/lib/mempool/src/tests.rs +++ b/core/lib/mempool/src/tests.rs @@ -321,32 +321,26 @@ fn stashed_accounts() { #[test] fn mempool_capacity() { - let mut mempool = MempoolStore::new(PriorityOpId(0), 5); + let mut mempool = MempoolStore::new(PriorityOpId(0), 4); let account0 = Address::random(); let account1 = Address::random(); let account2 = Address::random(); + let account3 = Address::random(); let transactions = vec![ gen_l2_tx(account0, Nonce(0)), gen_l2_tx(account0, Nonce(1)), gen_l2_tx(account0, Nonce(2)), - gen_l2_tx(account1, Nonce(1)), - gen_l2_tx(account2, Nonce(1)), + gen_l2_tx_with_timestamp(account1, Nonce(0), unix_timestamp_ms() + 1), + gen_l2_tx_with_timestamp(account2, Nonce(0), unix_timestamp_ms() + 2), + gen_l2_tx(account3, Nonce(1)), ]; mempool.insert(transactions, HashMap::new()); - // the mempool is full. Accounts with non-sequential nonces got stashed + // Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score got stashed assert_eq!( HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), - HashSet::<_>::from_iter(vec![account1, account2]), - ); - // verify that existing good-to-go transactions and new ones got picked - mempool.insert( - vec![gen_l2_tx_with_timestamp( - account1, - Nonce(0), - unix_timestamp_ms() + 1, - )], - HashMap::new(), + HashSet::<_>::from_iter(vec![account2, account3]), ); + // verify that good-to-go transactions are kept. for _ in 0..3 { assert_eq!( mempool @@ -363,6 +357,37 @@ fn mempool_capacity() { .initiator_account(), account1 ); + assert!(!mempool.has_next(&L2TxFilter::default())); +} + +#[test] +fn mempool_does_not_purge_all_accounts() { + let mut mempool = MempoolStore::new(PriorityOpId(0), 1); + let account0 = Address::random(); + let account1 = Address::random(); + let transactions = vec![ + gen_l2_tx(account0, Nonce(0)), + gen_l2_tx(account0, Nonce(1)), + gen_l2_tx(account1, Nonce(1)), + ]; + mempool.insert(transactions, HashMap::new()); + // Mempool is full. Account 1 has tx with non-sequential nonce so it should be purged. + // Txs from account 0 have sequential nonces but their number is greater than capacity; they should be kept. + assert_eq!( + HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), + HashSet::<_>::from_iter(vec![account1]), + ); + // verify that good-to-go transactions are kept. + for _ in 0..2 { + assert_eq!( + mempool + .next_transaction(&L2TxFilter::default()) + .unwrap() + .initiator_account(), + account0 + ); + } + assert!(!mempool.has_next(&L2TxFilter::default())); } fn gen_l2_tx(address: Address, nonce: Nonce) -> Transaction { diff --git a/core/node/state_keeper/src/mempool_actor.rs b/core/node/state_keeper/src/mempool_actor.rs index dbe1e4cb977f..d19a769e21b8 100644 --- a/core/node/state_keeper/src/mempool_actor.rs +++ b/core/node/state_keeper/src/mempool_actor.rs @@ -89,12 +89,30 @@ impl MempoolFetcher { .await .context("failed getting pending protocol version")?; - let l2_tx_filter = l2_tx_filter( - self.batch_fee_input_provider.as_ref(), - protocol_version.into(), - ) - .await - .context("failed creating L2 transaction filter")?; + let l2_tx_filter = if let Some(unsealed_batch) = storage + .blocks_dal() + .get_unsealed_l1_batch() + .await + .context("failed getting unsealed batch")? + { + let (base_fee, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata( + unsealed_batch.fee_input, + protocol_version.into(), + ); + + L2TxFilter { + fee_input: unsealed_batch.fee_input, + fee_per_gas: base_fee, + gas_per_pubdata: gas_per_pubdata as u32, + } + } else { + l2_tx_filter( + self.batch_fee_input_provider.as_ref(), + protocol_version.into(), + ) + .await + .context("failed creating L2 transaction filter")? + }; let transactions = storage .transactions_dal() From f613407496319ec94628d2feb0db9d269ff2d11c Mon Sep 17 00:00:00 2001 From: perekopskiy Date: Wed, 16 Oct 2024 17:19:21 +0300 Subject: [PATCH 2/4] fix word in test --- core/lib/mempool/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/lib/mempool/src/tests.rs b/core/lib/mempool/src/tests.rs index ed107fc409c1..c5be498a4774 100644 --- a/core/lib/mempool/src/tests.rs +++ b/core/lib/mempool/src/tests.rs @@ -335,7 +335,7 @@ fn mempool_capacity() { gen_l2_tx(account3, Nonce(1)), ]; mempool.insert(transactions, HashMap::new()); - // Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score got stashed + // Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score should be purged. assert_eq!( HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), HashSet::<_>::from_iter(vec![account2, account3]), From 136d010c9516871c006a215f3ea54d9592133217 Mon Sep 17 00:00:00 2001 From: perekopskiy Date: Wed, 16 Oct 2024 18:41:11 +0300 Subject: [PATCH 3/4] fix after review --- core/lib/mempool/src/mempool_store.rs | 20 ++++++++++++-------- core/lib/mempool/src/tests.rs | 7 ++----- core/node/state_keeper/src/mempool_actor.rs | 21 +++++++++------------ 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/core/lib/mempool/src/mempool_store.rs b/core/lib/mempool/src/mempool_store.rs index 47c6395cd7ab..acaf3f6ee02f 100644 --- a/core/lib/mempool/src/mempool_store.rs +++ b/core/lib/mempool/src/mempool_store.rs @@ -233,14 +233,18 @@ impl MempoolStore { .map(|txs| (pointer.account, txs)) }) .collect(); - let mut number_of_accounts_kept = possibly_kept - .iter() - .scan(0, |sum, (_, txs)| { - *sum += txs.len(); - (*sum <= self.capacity as usize).then_some(()) - }) - .count(); - if number_of_accounts_kept == 0 { + + let mut sum = 0; + let mut number_of_accounts_kept = 0; + for (_, txs) in possibly_kept { + sum += txs.len(); + if sum <= self.capacity as usize { + number_of_accounts_kept += 1; + } else { + break; + } + } + if number_of_accounts_kept == 0 && !possibly_kept.is_empty() { tracing::warn!("mempool capacity is too low to handle txs from single account, consider increasing capacity"); // Keep at least one entry, otherwise mempool won't return any new L2 tx to process. number_of_accounts_kept = 1; diff --git a/core/lib/mempool/src/tests.rs b/core/lib/mempool/src/tests.rs index c5be498a4774..b84ab7d57651 100644 --- a/core/lib/mempool/src/tests.rs +++ b/core/lib/mempool/src/tests.rs @@ -338,7 +338,7 @@ fn mempool_capacity() { // Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score should be purged. assert_eq!( HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), - HashSet::<_>::from_iter(vec![account2, account3]), + HashSet::from([account2, account3]), ); // verify that good-to-go transactions are kept. for _ in 0..3 { @@ -373,10 +373,7 @@ fn mempool_does_not_purge_all_accounts() { mempool.insert(transactions, HashMap::new()); // Mempool is full. Account 1 has tx with non-sequential nonce so it should be purged. // Txs from account 0 have sequential nonces but their number is greater than capacity; they should be kept. - assert_eq!( - HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts), - HashSet::<_>::from_iter(vec![account1]), - ); + assert_eq!(mempool.get_mempool_info().purged_accounts, vec![account1]); // verify that good-to-go transactions are kept. for _ in 0..2 { assert_eq!( diff --git a/core/node/state_keeper/src/mempool_actor.rs b/core/node/state_keeper/src/mempool_actor.rs index d19a769e21b8..a17f2670cbb9 100644 --- a/core/node/state_keeper/src/mempool_actor.rs +++ b/core/node/state_keeper/src/mempool_actor.rs @@ -89,29 +89,26 @@ impl MempoolFetcher { .await .context("failed getting pending protocol version")?; - let l2_tx_filter = if let Some(unsealed_batch) = storage + let (fee_per_gas, gas_per_pubdata) = if let Some(unsealed_batch) = storage .blocks_dal() .get_unsealed_l1_batch() .await .context("failed getting unsealed batch")? { - let (base_fee, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata( + let (fee_per_gas, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata( unsealed_batch.fee_input, protocol_version.into(), ); - - L2TxFilter { - fee_input: unsealed_batch.fee_input, - fee_per_gas: base_fee, - gas_per_pubdata: gas_per_pubdata as u32, - } + (fee_per_gas, gas_per_pubdata as u32) } else { - l2_tx_filter( + let filter = l2_tx_filter( self.batch_fee_input_provider.as_ref(), protocol_version.into(), ) .await - .context("failed creating L2 transaction filter")? + .context("failed creating L2 transaction filter")?; + + (filter.fee_per_gas, filter.gas_per_pubdata) }; let transactions = storage @@ -119,8 +116,8 @@ impl MempoolFetcher { .sync_mempool( &mempool_info.stashed_accounts, &mempool_info.purged_accounts, - l2_tx_filter.gas_per_pubdata, - l2_tx_filter.fee_per_gas, + gas_per_pubdata, + fee_per_gas, self.sync_batch_size, ) .await From dbb31100e590f4735c359e1212797020bb127557 Mon Sep 17 00:00:00 2001 From: perekopskiy Date: Wed, 16 Oct 2024 18:45:51 +0300 Subject: [PATCH 4/4] fix --- core/lib/mempool/src/mempool_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/lib/mempool/src/mempool_store.rs b/core/lib/mempool/src/mempool_store.rs index acaf3f6ee02f..f6f9b72f9b64 100644 --- a/core/lib/mempool/src/mempool_store.rs +++ b/core/lib/mempool/src/mempool_store.rs @@ -236,7 +236,7 @@ impl MempoolStore { let mut sum = 0; let mut number_of_accounts_kept = 0; - for (_, txs) in possibly_kept { + for (_, txs) in &possibly_kept { sum += txs.len(); if sum <= self.capacity as usize { number_of_accounts_kept += 1;