From 3f876b8045fa27503e54e48dfdebd0e30c763ba4 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Mon, 25 Nov 2024 00:10:54 -0800 Subject: [PATCH] Remove UnprocessedTransactionStorage::LocalTransactionStorage --- core/src/banking_stage.rs | 12 +- core/src/banking_stage/forwarder.rs | 2 +- .../unprocessed_transaction_storage.rs | 549 +----------------- 3 files changed, 22 insertions(+), 541 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1e776d51724149..edb4d4c8d54835 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -12,8 +12,7 @@ use { leader_slot_metrics::LeaderSlotMetricsTracker, packet_receiver::PacketReceiver, qos_service::QosService, - unprocessed_packet_batches::*, - unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, + unprocessed_transaction_storage::UnprocessedTransactionStorage, }, crate::{ banking_stage::{ @@ -435,8 +434,6 @@ impl BankingStage { // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. let data_budget = Arc::new(DataBudget::default()); - let batch_limit = - TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); // Keeps track of extraneous vote transactions for the vote threads let latest_unprocessed_votes = { let bank = bank_forks.read().unwrap().working_bank(); @@ -471,9 +468,10 @@ impl BankingStage { ), _ => ( non_vote_receiver.clone(), - UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::with_capacity(batch_limit), - ThreadType::Transactions, + // TODO: Check if this is alright. + UnprocessedTransactionStorage::new_vote_storage( + latest_unprocessed_votes.clone(), + VoteSource::Tpu, ), ), }; diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 0f54f6eb5e62b2..19f614656a7d1b 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -548,7 +548,7 @@ mod tests { (packet.meta().size, DeserializedPacket::new(packet).unwrap()) }; - let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( + let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_vote_storage( UnprocessedPacketBatches::from_iter( vec![ forwarded_packet, diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index e86780002ea694..b465e99d371185 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -11,19 +11,17 @@ use { multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision}, read_write_account_set::ReadWriteAccountSet, unprocessed_packet_batches::{ - DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches, + DeserializedPacket, PacketBatchInsertionMetrics, }, BankingStageStats, FilterForwardingResults, ForwardOption, }, itertools::Itertools, - min_max_heap::MinMaxHeap, solana_accounts_db::account_locks::validate_account_locks, - solana_feature_set::FeatureSet, solana_measure::measure_us, solana_runtime::bank::Bank, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ - clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, hash::Hash, saturating_add_assign, + hash::Hash, transaction::SanitizedTransaction, }, solana_svm::transaction_error_metrics::TransactionErrorMetrics, @@ -43,13 +41,6 @@ const MAX_NUM_VOTES_RECEIVE: usize = 10_000; #[derive(Debug)] pub enum UnprocessedTransactionStorage { VoteStorage(VoteStorage), - LocalTransactionStorage(ThreadLocalUnprocessedPackets), -} - -#[derive(Debug)] -pub struct ThreadLocalUnprocessedPackets { - unprocessed_packet_batches: UnprocessedPacketBatches, - thread_type: ThreadType, } #[derive(Debug)] @@ -114,25 +105,6 @@ impl From for InsertPacketBatchSummary { } } -fn filter_processed_packets<'a, F>( - retryable_transaction_indexes: impl Iterator, - mut f: F, -) where - F: FnMut(usize, usize), -{ - let mut prev_retryable_index = 0; - for (i, retryable_index) in retryable_transaction_indexes.enumerate() { - let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; - - let end = *retryable_index; - prev_retryable_index = *retryable_index; - - if start < end { - f(start, end) - } - } -} - /// Convenient wrapper for shared-state between banking stage processing and the /// multi-iterator checking function. pub struct ConsumeScannerPayload<'a> { @@ -256,16 +228,6 @@ where } impl UnprocessedTransactionStorage { - pub fn new_transaction_storage( - unprocessed_packet_batches: UnprocessedPacketBatches, - thread_type: ThreadType, - ) -> Self { - Self::LocalTransactionStorage(ThreadLocalUnprocessedPackets { - unprocessed_packet_batches, - thread_type, - }) - } - pub fn new_vote_storage( latest_unprocessed_votes: Arc, vote_source: VoteSource, @@ -279,32 +241,24 @@ impl UnprocessedTransactionStorage { pub fn is_empty(&self) -> bool { match self { Self::VoteStorage(vote_storage) => vote_storage.is_empty(), - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.is_empty(), } } pub fn len(&self) -> usize { match self { Self::VoteStorage(vote_storage) => vote_storage.len(), - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.len(), } } pub fn get_min_priority(&self) -> Option { match self { Self::VoteStorage(_) => None, - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.get_min_compute_unit_price() - } } } pub fn get_max_priority(&self) -> Option { match self { Self::VoteStorage(_) => None, - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.get_max_compute_unit_price() - } } } @@ -312,25 +266,19 @@ impl UnprocessedTransactionStorage { pub fn max_receive_size(&self) -> usize { match self { Self::VoteStorage(vote_storage) => vote_storage.max_receive_size(), - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.max_receive_size() - } } } pub fn should_not_process(&self) -> bool { // The gossip vote thread does not need to process or forward any votes, that is // handled by the tpu vote thread - if let Self::VoteStorage(vote_storage) = self { - return matches!(vote_storage.vote_source, VoteSource::Gossip); - } - false + let Self::VoteStorage(vote_storage) = self; + return matches!(vote_storage.vote_source, VoteSource::Gossip); } #[cfg(test)] pub fn iter(&mut self) -> impl Iterator { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.iter(), _ => panic!(), } } @@ -338,15 +286,11 @@ impl UnprocessedTransactionStorage { pub fn forward_option(&self) -> ForwardOption { match self { Self::VoteStorage(vote_storage) => vote_storage.forward_option(), - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.forward_option() - } } } pub fn clear_forwarded_packets(&mut self) { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.clear(), // Since we set everything as forwarded this is the same Self::VoteStorage(vote_storage) => vote_storage.clear_forwarded_packets(), } } @@ -359,9 +303,6 @@ impl UnprocessedTransactionStorage { Self::VoteStorage(vote_storage) => { InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) } - Self::LocalTransactionStorage(transaction_storage) => InsertPacketBatchSummary::from( - transaction_storage.insert_batch(deserialized_packets), - ), } } @@ -371,11 +312,6 @@ impl UnprocessedTransactionStorage { forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> FilterForwardingResults { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage - .filter_forwardable_packets_and_add_batches( - bank, - forward_packet_batches_by_accounts, - ), Self::VoteStorage(vote_storage) => vote_storage .filter_forwardable_packets_and_add_batches( bank, @@ -402,13 +338,6 @@ impl UnprocessedTransactionStorage { ) -> Option>, { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage - .process_packets( - &bank, - banking_stage_stats, - slot_metrics_tracker, - processing_function, - ), Self::VoteStorage(vote_storage) => vote_storage.process_packets( bank, banking_stage_stats, @@ -420,7 +349,6 @@ impl UnprocessedTransactionStorage { pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { match self { - Self::LocalTransactionStorage(_) => (), Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), } } @@ -572,449 +500,6 @@ impl VoteStorage { } } -impl ThreadLocalUnprocessedPackets { - fn is_empty(&self) -> bool { - self.unprocessed_packet_batches.is_empty() - } - - pub fn thread_type(&self) -> ThreadType { - self.thread_type - } - - fn len(&self) -> usize { - self.unprocessed_packet_batches.len() - } - - pub fn get_min_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_min_compute_unit_price() - } - - pub fn get_max_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_max_compute_unit_price() - } - - fn max_receive_size(&self) -> usize { - self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len() - } - - #[cfg(test)] - fn iter(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter_mut() - } - - fn forward_option(&self) -> ForwardOption { - match self.thread_type { - ThreadType::Transactions => ForwardOption::ForwardTransaction, - ThreadType::Voting(VoteSource::Tpu) => ForwardOption::ForwardTpuVote, - ThreadType::Voting(VoteSource::Gossip) => ForwardOption::NotForward, - } - } - - fn clear(&mut self) { - self.unprocessed_packet_batches.clear(); - } - - fn insert_batch( - &mut self, - deserialized_packets: Vec, - ) -> PacketBatchInsertionMetrics { - self.unprocessed_packet_batches.insert_batch( - deserialized_packets - .into_iter() - .map(DeserializedPacket::from_immutable_section), - ) - } - - /// Filter out packets that fail to sanitize, or are no longer valid (could be - /// too old, a duplicate of something already processed). Doing this in batches to avoid - /// checking bank's blockhash and status cache per transaction which could be bad for performance. - /// Added valid and sanitized packets to forwarding queue. - fn filter_forwardable_packets_and_add_batches( - &mut self, - bank: Arc, - forward_buffer: &mut ForwardPacketBatchesByAccounts, - ) -> FilterForwardingResults { - let mut total_forwardable_tracer_packets: usize = 0; - let mut total_tracer_packets_in_buffer: usize = 0; - let mut total_forwardable_packets: usize = 0; - let mut total_packet_conversion_us: u64 = 0; - let mut total_filter_packets_us: u64 = 0; - let mut total_dropped_packets: usize = 0; - - let mut original_priority_queue = self.take_priority_queue(); - let original_capacity = original_priority_queue.capacity(); - let mut new_priority_queue = MinMaxHeap::with_capacity(original_capacity); - - // indicates if `forward_buffer` still accept more packets, see details at - // `ForwardPacketBatchesByAccounts.rs`. - let mut accepting_packets = true; - // batch iterate through self.unprocessed_packet_batches in desc priority order - new_priority_queue.extend( - original_priority_queue - .drain_desc() - .chunks(UNPROCESSED_BUFFER_STEP_SIZE) - .into_iter() - .flat_map(|packets_to_process| { - // Only process packets not yet forwarded - let (forwarded_packets, packets_to_forward, is_tracer_packet) = self - .prepare_packets_to_forward( - packets_to_process, - &mut total_tracer_packets_in_buffer, - ); - - [ - forwarded_packets, - if accepting_packets { - let ( - (sanitized_transactions, transaction_to_packet_indexes), - packet_conversion_us, - ) = measure_us!(self.sanitize_unforwarded_packets( - &packets_to_forward, - &bank, - &mut total_dropped_packets - )); - saturating_add_assign!( - total_packet_conversion_us, - packet_conversion_us - ); - - let (forwardable_transaction_indexes, filter_packets_us) = - measure_us!(Self::filter_invalid_transactions( - &sanitized_transactions, - &bank, - &mut total_dropped_packets - )); - saturating_add_assign!(total_filter_packets_us, filter_packets_us); - - for forwardable_transaction_index in &forwardable_transaction_indexes { - saturating_add_assign!(total_forwardable_packets, 1); - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - if is_tracer_packet[forwardable_packet_index] { - saturating_add_assign!(total_forwardable_tracer_packets, 1); - } - } - - let accepted_packet_indexes = - Self::add_filtered_packets_to_forward_buffer( - forward_buffer, - &packets_to_forward, - &sanitized_transactions, - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - &mut total_dropped_packets, - &bank.feature_set, - ); - accepting_packets = accepted_packet_indexes.len() - == forwardable_transaction_indexes.len(); - - self.unprocessed_packet_batches - .mark_accepted_packets_as_forwarded( - &packets_to_forward, - &accepted_packet_indexes, - ); - - Self::collect_retained_packets( - &mut self.unprocessed_packet_batches.message_hash_to_transaction, - &packets_to_forward, - &Self::prepare_filtered_packet_indexes( - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - ), - ) - } else { - // skip sanitizing and filtering if not longer able to add more packets for forwarding - saturating_add_assign!(total_dropped_packets, packets_to_forward.len()); - packets_to_forward - }, - ] - .concat() - }), - ); - - // replace packet priority queue - self.unprocessed_packet_batches.packet_priority_queue = new_priority_queue; - self.verify_priority_queue(original_capacity); - - // Assert unprocessed queue is still consistent - assert_eq!( - self.unprocessed_packet_batches.packet_priority_queue.len(), - self.unprocessed_packet_batches - .message_hash_to_transaction - .len() - ); - - FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - total_dropped_packets, - total_packet_conversion_us, - total_filter_packets_us, - } - } - - /// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. - fn take_priority_queue(&mut self) -> MinMaxHeap> { - std::mem::replace( - &mut self.unprocessed_packet_batches.packet_priority_queue, - MinMaxHeap::new(), // <-- no need to reserve capacity as we will replace this - ) - } - - /// Verify that the priority queue and map are consistent and that original capacity is maintained. - fn verify_priority_queue(&self, original_capacity: usize) { - // Assert unprocessed queue is still consistent and maintains original capacity - assert_eq!( - self.unprocessed_packet_batches - .packet_priority_queue - .capacity(), - original_capacity - ); - assert_eq!( - self.unprocessed_packet_batches.packet_priority_queue.len(), - self.unprocessed_packet_batches - .message_hash_to_transaction - .len() - ); - } - - /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. - fn sanitize_unforwarded_packets( - &mut self, - packets_to_process: &[Arc], - bank: &Bank, - total_dropped_packets: &mut usize, - ) -> (Vec>, Vec) { - // Get ref of ImmutableDeserializedPacket - let deserialized_packets = packets_to_process.iter().map(|p| &**p); - let (transactions, transaction_to_packet_indexes): (Vec<_>, Vec<_>) = deserialized_packets - .enumerate() - .filter_map(|(packet_index, deserialized_packet)| { - deserialized_packet - .build_sanitized_transaction( - bank.vote_only_bank(), - bank, - bank.get_reserved_account_keys(), - ) - .map(|(transaction, _deactivation_slot)| (transaction, packet_index)) - }) - .unzip(); - - let filtered_count = packets_to_process.len().saturating_sub(transactions.len()); - saturating_add_assign!(*total_dropped_packets, filtered_count); - - (transactions, transaction_to_packet_indexes) - } - - /// Checks sanitized transactions against bank, returns valid transaction indexes - fn filter_invalid_transactions( - transactions: &[RuntimeTransaction], - bank: &Bank, - total_dropped_packets: &mut usize, - ) -> Vec { - let filter = vec![Ok(()); transactions.len()]; - let results = bank.check_transactions_with_forwarding_delay( - transactions, - &filter, - FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, - ); - - let filtered_count = transactions.len().saturating_sub(results.len()); - saturating_add_assign!(*total_dropped_packets, filtered_count); - - results - .iter() - .enumerate() - .filter_map(|(tx_index, result)| result.as_ref().ok().map(|_| tx_index)) - .collect_vec() - } - - fn prepare_filtered_packet_indexes( - transaction_to_packet_indexes: &[usize], - retained_transaction_indexes: &[usize], - ) -> Vec { - retained_transaction_indexes - .iter() - .map(|tx_index| transaction_to_packet_indexes[*tx_index]) - .collect_vec() - } - - /// try to add filtered forwardable and valid packets to forward buffer; - /// returns vector of packet indexes that were accepted for forwarding. - fn add_filtered_packets_to_forward_buffer( - forward_buffer: &mut ForwardPacketBatchesByAccounts, - packets_to_process: &[Arc], - transactions: &[RuntimeTransaction], - transaction_to_packet_indexes: &[usize], - forwardable_transaction_indexes: &[usize], - total_dropped_packets: &mut usize, - feature_set: &FeatureSet, - ) -> Vec { - let mut added_packets_count: usize = 0; - let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len()); - for forwardable_transaction_index in forwardable_transaction_indexes { - let sanitized_transaction = &transactions[*forwardable_transaction_index]; - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - let immutable_deserialized_packet = - packets_to_process[forwardable_packet_index].clone(); - if !forward_buffer.try_add_packet( - sanitized_transaction, - immutable_deserialized_packet, - feature_set, - ) { - break; - } - accepted_packet_indexes.push(forwardable_packet_index); - saturating_add_assign!(added_packets_count, 1); - } - - let filtered_count = forwardable_transaction_indexes - .len() - .saturating_sub(added_packets_count); - saturating_add_assign!(*total_dropped_packets, filtered_count); - - accepted_packet_indexes - } - - fn collect_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) -> Vec> { - Self::remove_non_retained_packets( - message_hash_to_transaction, - packets_to_process, - retained_packet_indexes, - ); - retained_packet_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() - } - - /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have - /// been removed from UnprocessedPacketBatches.packet_priority_queue - fn remove_non_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) { - filter_processed_packets( - retained_packet_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - message_hash_to_transaction.remove(processed_packet.message_hash()); - } - }, - ) - } - - // returns `true` if reached end of slot - fn process_packets( - &mut self, - bank: &Bank, - banking_stage_stats: &BankingStageStats, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - mut processing_function: F, - ) -> bool - where - F: FnMut( - &Vec>, - &mut ConsumeScannerPayload, - ) -> Option>, - { - let mut retryable_packets = self.take_priority_queue(); - let original_capacity = retryable_packets.capacity(); - let mut new_retryable_packets = MinMaxHeap::with_capacity(original_capacity); - let all_packets_to_process = retryable_packets.drain_desc().collect_vec(); - - let should_process_packet = - |packet: &Arc, payload: &mut ConsumeScannerPayload| { - consume_scan_should_process_packet(bank, banking_stage_stats, packet, payload) - }; - let mut scanner = create_consume_multi_iterator( - &all_packets_to_process, - slot_metrics_tracker, - &mut self.unprocessed_packet_batches.message_hash_to_transaction, - should_process_packet, - ); - - while let Some((packets_to_process, payload)) = scanner.iterate() { - let packets_to_process = packets_to_process - .iter() - .map(|p| (*p).clone()) - .collect_vec(); - let retryable_packets = if let Some(retryable_transaction_indexes) = - processing_function(&packets_to_process, payload) - { - Self::collect_retained_packets( - payload.message_hash_to_transaction, - &packets_to_process, - &retryable_transaction_indexes, - ) - } else { - packets_to_process - }; - - new_retryable_packets.extend(retryable_packets); - } - - let reached_end_of_slot = scanner.finalize().payload.reached_end_of_slot; - - self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; - self.verify_priority_queue(original_capacity); - - reached_end_of_slot - } - - /// Prepare a chunk of packets for forwarding, filter out already forwarded packets while - /// counting tracers. - /// Returns Vec of unforwarded packets, and Vec of same size each indicates corresponding - /// packet is tracer packet. - fn prepare_packets_to_forward( - &self, - packets_to_forward: impl Iterator>, - total_tracer_packets_in_buffer: &mut usize, - ) -> ( - Vec>, - Vec>, - Vec, - ) { - let mut forwarded_packets: Vec> = vec![]; - let (forwardable_packets, is_tracer_packet) = packets_to_forward - .into_iter() - .filter_map(|immutable_deserialized_packet| { - let is_tracer_packet = immutable_deserialized_packet - .original_packet() - .meta() - .is_tracer_packet(); - if is_tracer_packet { - saturating_add_assign!(*total_tracer_packets_in_buffer, 1); - } - if !self - .unprocessed_packet_batches - .is_forwarded(&immutable_deserialized_packet) - { - Some((immutable_deserialized_packet, is_tracer_packet)) - } else { - forwarded_packets.push(immutable_deserialized_packet); - None - } - }) - .unzip(); - - (forwarded_packets, forwardable_packets, is_tracer_packet) - } -} - #[cfg(test)] mod tests { use { @@ -1123,11 +608,9 @@ mod tests { // all packets are forwarded { - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - buffered_packet_batches, - ThreadType::Transactions, + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(latest_unprocessed_votes), + vote_source, ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); @@ -1164,9 +647,9 @@ mod tests { } let buffered_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - buffered_packet_batches, - ThreadType::Transactions, + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(latest_unprocessed_votes), + vote_source, ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); @@ -1198,9 +681,9 @@ mod tests { } let buffered_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - buffered_packet_batches, - ThreadType::Transactions, + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(latest_unprocessed_votes), + vote_source, ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); @@ -1257,9 +740,9 @@ mod tests { ThreadType::Voting(VoteSource::Gossip), ThreadType::Voting(VoteSource::Tpu), ] { - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::with_capacity(100), - thread_type, + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(latest_unprocessed_votes), + vote_source, ); transaction_storage.insert_batch(vec![ ImmutableDeserializedPacket::new(small_transfer.clone())?,