From eb42e9bed2eee423d491c276a8a03c97c34ca24a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 21:21:47 +0000 Subject: [PATCH 01/19] Add prio_graph_scheduler.rs --- core/src/banking_stage/transaction_scheduler/mod.rs | 2 ++ .../banking_stage/transaction_scheduler/prio_graph_scheduler.rs | 1 + 2 files changed, 3 insertions(+) create mode 100644 core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index 96639f4ad40205..c6c8cf06a69a00 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -11,4 +11,6 @@ mod batch_id_generator; #[allow(dead_code)] mod in_flight_tracker; #[allow(dead_code)] +mod prio_graph_scheduler; +#[allow(dead_code)] mod transaction_id_generator; diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs new file mode 100644 index 00000000000000..8b137891791fe9 --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -0,0 +1 @@ + From db4a9eff31b38795851692d1340dcafb41c59250 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 21:23:55 +0000 Subject: [PATCH 02/19] Add prio-graph dependency --- Cargo.lock | 7 +++++++ Cargo.toml | 9 +++++---- core/Cargo.toml | 1 + 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 378a93e91df240..dc749ead0d0c0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3933,6 +3933,12 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "prio-graph" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78dd2fa9ca0901b4d0dbf51d9862d7e3fb004605e4f4b4132472c3d08e7d901b" + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -5745,6 +5751,7 @@ dependencies = [ "lru", "min-max-heap", "num_enum 0.7.0", + "prio-graph", "quinn", "rand 0.8.5", "rand_chacha 0.3.1", diff --git a/Cargo.toml b/Cargo.toml index 05c6241523883a..a9047e2e91c7f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,9 +116,7 @@ members = [ "zk-token-sdk", ] -exclude = [ - "programs/sbf", -] +exclude = ["programs/sbf"] # This prevents a Travis CI error when building for Windows. resolver = "2" @@ -220,7 +218,9 @@ indexmap = "2.0.2" indicatif = "0.17.7" Inflector = "0.11.4" itertools = "0.10.5" -jemallocator = { package = "tikv-jemallocator", version = "0.4.1", features = ["unprefixed_malloc_on_supported_platforms"] } +jemallocator = { package = "tikv-jemallocator", version = "0.4.1", features = [ + "unprefixed_malloc_on_supported_platforms", +] } js-sys = "0.3.64" json5 = "0.4.1" jsonrpc-core = "18.0.0" @@ -259,6 +259,7 @@ pickledb = { version = "0.5.1", default-features = false } pkcs8 = "0.8.0" predicates = "2.1" pretty-hex = "0.3.0" +prio-graph = "0.1.0" proc-macro2 = "1.0.69" proptest = "1.2" prost = "0.11.9" diff --git a/core/Cargo.toml b/core/Cargo.toml index c3923613b768a2..78859f54e9cf4a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,6 +31,7 @@ log = { workspace = true } lru = { workspace = true } min-max-heap = { workspace = true } num_enum = { workspace = true } +prio-graph = { workspace = true } quinn = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } From fb0a52df1e29a93887e8029e654872b099647521 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 21:33:41 +0000 Subject: [PATCH 03/19] Basic structure for schedule --- .../transaction_scheduler/mod.rs | 1 + .../prio_graph_scheduler.rs | 44 +++++++++++++++++++ .../transaction_scheduler/scheduler_error.rs | 4 ++ 3 files changed, 49 insertions(+) create mode 100644 core/src/banking_stage/transaction_scheduler/scheduler_error.rs diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index c6c8cf06a69a00..bf6f761baca88c 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -12,5 +12,6 @@ mod batch_id_generator; mod in_flight_tracker; #[allow(dead_code)] mod prio_graph_scheduler; +mod scheduler_error; #[allow(dead_code)] mod transaction_id_generator; diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 8b137891791fe9..79ad8b49890173 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -1 +1,45 @@ +use { + super::{ + in_flight_tracker::InFlightTracker, scheduler_error::SchedulerError, + thread_aware_account_locks::ThreadAwareAccountLocks, + transaction_state_container::TransactionStateContainer, + }, + crate::banking_stage::scheduler_messages::{ConsumeWork, FinishedConsumeWork}, + crossbeam_channel::{Receiver, Sender}, +}; +pub(crate) struct PrioGraphScheduler { + in_flight_tracker: InFlightTracker, + account_locks: ThreadAwareAccountLocks, + consume_work_senders: Vec>, + consume_work_receiver: Receiver, +} + +impl PrioGraphScheduler { + pub(crate) fn new( + consume_work_senders: Vec>, + consume_work_receiver: Receiver, + ) -> Self { + let num_threads = consume_work_senders.len(); + Self { + in_flight_tracker: InFlightTracker::new(num_threads), + account_locks: ThreadAwareAccountLocks::new(num_threads), + consume_work_senders, + consume_work_receiver, + } + } + + /// Schedule transactions from the given `TransactionStateContainer` to be consumed by the + /// worker threads. Returns the number of transactions scheduled, or an error. + /// + /// Uses a `PrioGraph` to perform look-ahead during the scheduling of transactions. + /// This, combined with internal tracking of threads' in-flight transactions, allows + /// for load-balancing while prioritizing scheduling transactions onto threads that will + /// not cause conflicts in the near future. + pub(crate) fn schedule( + &mut self, + _container: &mut TransactionStateContainer, + ) -> Result { + todo!() + } +} diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs new file mode 100644 index 00000000000000..4171749ecdb255 --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs @@ -0,0 +1,4 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SchedulerError {} From 099c4d7b61d2953cebb2cf99340f2272dc9fe4bd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 21:36:37 +0000 Subject: [PATCH 04/19] Add Batches --- .../prio_graph_scheduler.rs | 61 ++++++++++++++++++- .../transaction_scheduler/scheduler_error.rs | 5 +- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 79ad8b49890173..e43654cadd0ba1 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -1,11 +1,17 @@ use { super::{ - in_flight_tracker::InFlightTracker, scheduler_error::SchedulerError, - thread_aware_account_locks::ThreadAwareAccountLocks, + in_flight_tracker::InFlightTracker, + scheduler_error::SchedulerError, + thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId}, transaction_state_container::TransactionStateContainer, }, - crate::banking_stage::scheduler_messages::{ConsumeWork, FinishedConsumeWork}, + crate::banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + read_write_account_set::ReadWriteAccountSet, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionId}, + }, crossbeam_channel::{Receiver, Sender}, + solana_sdk::{slot_history::Slot, transaction::SanitizedTransaction}, }; pub(crate) struct PrioGraphScheduler { @@ -43,3 +49,52 @@ impl PrioGraphScheduler { todo!() } } + +struct Batches { + ids: Vec>, + transactions: Vec>, + max_age_slots: Vec>, + total_cus: Vec, + locks: Vec, +} + +impl Batches { + fn new(num_threads: usize) -> Self { + Self { + ids: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + transactions: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + max_age_slots: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + total_cus: vec![0; num_threads], + locks: std::iter::repeat_with(ReadWriteAccountSet::default) + .take(num_threads) + .collect(), + } + } + + fn take_batch( + &mut self, + thread_id: ThreadId, + ) -> ( + Vec, + Vec, + Vec, + u64, + ) { + self.locks[thread_id].clear(); + ( + core::mem::replace( + &mut self.ids[thread_id], + Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + ), + core::mem::replace( + &mut self.transactions[thread_id], + Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + ), + core::mem::replace( + &mut self.max_age_slots[thread_id], + Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + ), + core::mem::replace(&mut self.total_cus[thread_id], 0), + ) + } +} diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs index 4171749ecdb255..01abbd7f29f4cc 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs @@ -1,4 +1,7 @@ use thiserror::Error; #[derive(Debug, Error)] -pub enum SchedulerError {} +pub enum SchedulerError { + // #[error("Sending channel disconnected: {0}")] + // DisconnectedSendChannel(&'static str), +} From ff3a94ff27345d6f01cd565c0e803b80075b7f98 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 22:27:51 +0000 Subject: [PATCH 05/19] send_batch(es) --- .../prio_graph_scheduler.rs | 39 +++++++++++++++++++ .../transaction_scheduler/scheduler_error.rs | 4 +- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index e43654cadd0ba1..fdf9b2bdf855cf 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -48,6 +48,45 @@ impl PrioGraphScheduler { ) -> Result { todo!() } + + /// Send all batches of transactions to the worker threads. + /// Returns the number of transactions sent. + fn send_batches(&mut self, batches: &mut Batches) -> Result { + (0..self.consume_work_senders.len()) + .map(|thread_index| self.send_batch(batches, thread_index)) + .sum() + } + + /// Send a batch of transactions to the given thread's `ConsumeWork` channel. + /// Returns the number of transactions sent. + fn send_batch( + &mut self, + batches: &mut Batches, + thread_index: usize, + ) -> Result { + if batches.ids[thread_index].is_empty() { + return Ok(0); + } + + let (ids, transactions, max_age_slots, total_cus) = batches.take_batch(thread_index); + + let batch_id = self + .in_flight_tracker + .track_batch(ids.len(), total_cus, thread_index); + + let num_scheduled = ids.len(); + let work = ConsumeWork { + batch_id, + ids, + transactions, + max_age_slots, + }; + self.consume_work_senders[thread_index] + .send(work) + .map_err(|_| SchedulerError::DisconnectedSendChannel("consume work sender"))?; + + Ok(num_scheduled) + } } struct Batches { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs index 01abbd7f29f4cc..50715a19e4b7ef 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs @@ -2,6 +2,6 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum SchedulerError { - // #[error("Sending channel disconnected: {0}")] - // DisconnectedSendChannel(&'static str), + #[error("Sending channel disconnected: {0}")] + DisconnectedSendChannel(&'static str), } From 7014bb74a4b6aab6e11e07a7910450336ee8aad2 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 22:32:06 +0000 Subject: [PATCH 06/19] complete_batch --- .../prio_graph_scheduler.rs | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index fdf9b2bdf855cf..a59d27123da46f 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -8,9 +8,9 @@ use { crate::banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, read_write_account_set::ReadWriteAccountSet, - scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionId}, + scheduler_messages::{ConsumeWork, TransactionBatchId, TransactionId}, }, - crossbeam_channel::{Receiver, Sender}, + crossbeam_channel::Sender, solana_sdk::{slot_history::Slot, transaction::SanitizedTransaction}, }; @@ -18,20 +18,15 @@ pub(crate) struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>, - consume_work_receiver: Receiver, } impl PrioGraphScheduler { - pub(crate) fn new( - consume_work_senders: Vec>, - consume_work_receiver: Receiver, - ) -> Self { + pub(crate) fn new(consume_work_senders: Vec>) -> Self { let num_threads = consume_work_senders.len(); Self { in_flight_tracker: InFlightTracker::new(num_threads), account_locks: ThreadAwareAccountLocks::new(num_threads), consume_work_senders, - consume_work_receiver, } } @@ -49,6 +44,24 @@ impl PrioGraphScheduler { todo!() } + /// Mark a given `TransactionBatchId` as completed. + /// This will update the internal tracking, including account locks. + pub(crate) fn complete_batch( + &mut self, + batch_id: TransactionBatchId, + transactions: &[SanitizedTransaction], + ) { + let thread_id = self.in_flight_tracker.complete_batch(batch_id); + for transaction in transactions { + let account_locks = transaction.get_account_locks_unchecked(); + self.account_locks.unlock_accounts( + account_locks.writable.into_iter(), + account_locks.readonly.into_iter(), + thread_id, + ); + } + } + /// Send all batches of transactions to the worker threads. /// Returns the number of transactions sent. fn send_batches(&mut self, batches: &mut Batches) -> Result { From 896bb3c883701e55a2e304561c302b8dd55d02be Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 22:36:46 +0000 Subject: [PATCH 07/19] select_thread --- .../prio_graph_scheduler.rs | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index a59d27123da46f..078a6ce3f7d2f4 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -2,7 +2,7 @@ use { super::{ in_flight_tracker::InFlightTracker, scheduler_error::SchedulerError, - thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId}, + thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet}, transaction_state_container::TransactionStateContainer, }, crate::banking_stage::{ @@ -62,6 +62,37 @@ impl PrioGraphScheduler { } } + /// Given the schedulable `thread_set`, select the thread with the least amount + /// of work queued up. + /// Currently, "work" is just defined as the number of transactions. + /// + /// If the `chain_thread` is available, this thread will be selected, regardless of + /// load-balancing. + /// + /// Panics if the `thread_set` is empty. + fn select_thread( + thread_set: ThreadSet, + chain_thread: ThreadId, + batches_per_thread: &[Vec], + in_flight_per_thread: &[usize], + ) -> ThreadId { + if thread_set.contains(chain_thread) { + chain_thread + } else { + thread_set + .contained_threads_iter() + .map(|thread_id| { + ( + thread_id, + batches_per_thread[thread_id].len() + in_flight_per_thread[thread_id], + ) + }) + .min_by(|a, b| a.1.cmp(&b.1)) + .map(|(thread_id, _)| thread_id) + .unwrap() + } + } + /// Send all batches of transactions to the worker threads. /// Returns the number of transactions sent. fn send_batches(&mut self, batches: &mut Batches) -> Result { From ba8b2635124015ca5dbf6bbf7d228291ad30a061 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 22:44:15 +0000 Subject: [PATCH 08/19] TransactionStateContainer::pop --- .../transaction_scheduler/transaction_state_container.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index f5f80f30aceb40..76807653315117 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -57,6 +57,11 @@ impl TransactionStateContainer { self.priority_queue.capacity() - self.priority_queue.len() } + /// Get the top transaction id in the priority queue. + pub(crate) fn pop(&mut self) -> Option { + self.priority_queue.pop_max() + } + /// Get an iterator of the top `n` transaction ids in the priority queue. /// This will remove the ids from the queue, but not drain the remainder /// of the queue. @@ -64,7 +69,7 @@ impl TransactionStateContainer { &mut self, n: usize, ) -> impl Iterator + '_ { - (0..n).map_while(|_| self.priority_queue.pop_max()) + (0..n).map_while(|_| self.pop()) } /// Serialize entire priority queue. `hold` indicates whether the priority queue should From cd49ed48e4033fb3ba07b3ff775b88c5f3474576 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 23:14:22 +0000 Subject: [PATCH 09/19] schedule --- .../prio_graph_scheduler.rs | 237 +++++++++++++++--- .../transaction_priority_id.rs | 18 +- 2 files changed, 215 insertions(+), 40 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 078a6ce3f7d2f4..42732ecee14282 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -3,15 +3,19 @@ use { in_flight_tracker::InFlightTracker, scheduler_error::SchedulerError, thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet}, + transaction_state::SanitizedTransactionTTL, transaction_state_container::TransactionStateContainer, }, crate::banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, read_write_account_set::ReadWriteAccountSet, scheduler_messages::{ConsumeWork, TransactionBatchId, TransactionId}, + transaction_scheduler::transaction_priority_id::TransactionPriorityId, }, crossbeam_channel::Sender, - solana_sdk::{slot_history::Slot, transaction::SanitizedTransaction}, + prio_graph::{AccessKind, PrioGraph}, + solana_sdk::{pubkey::Pubkey, slot_history::Slot, transaction::SanitizedTransaction}, + std::collections::HashMap, }; pub(crate) struct PrioGraphScheduler { @@ -39,9 +43,149 @@ impl PrioGraphScheduler { /// not cause conflicts in the near future. pub(crate) fn schedule( &mut self, - _container: &mut TransactionStateContainer, + container: &mut TransactionStateContainer, ) -> Result { - todo!() + let num_threads = self.consume_work_senders.len(); + let mut batches = Batches::new(num_threads); + let mut chain_id_to_thread_index = HashMap::new(); + // Some transactions may be unschedulable due to multi-thread conflicts. + // These transactions cannot be scheduled until some conflicting work is completed. + // However, the scheduler should not allow other transactions that conflict with + // these transactions to be scheduled before them. + let mut unschedulable_ids = Vec::new(); + let mut blocking_locks = ReadWriteAccountSet::default(); + + // Number of transactions to keep `active` in the `PrioGraph` during scheduling. + // This only needs to be large enough to give the scheduler a reasonable idea if + // a transaction will conflict with another upcoming transaction. + const LOOK_AHEAD_WINDOW: usize = 2048; + + let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id); + + // Create the initial look-ahead window. + for _ in 0..LOOK_AHEAD_WINDOW { + let Some(id) = container.pop() else { + break; + }; + + let transaction = container.get_transaction_ttl(&id.id).unwrap(); + prio_graph.insert_transaction(id, Self::get_transaction_resource_access(transaction)); + } + + let mut unblock_this_batch = + Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); + const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000; + let mut num_scheduled = 0; + while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. + if prio_graph.is_empty() { + break; + } + + while let Some(id) = prio_graph.pop() { + unblock_this_batch.push(id); + + // Push next transaction from container into the `PrioGraph` look-ahead window. + if let Some(next_id) = container.pop() { + let transaction = container.get_transaction_ttl(&next_id.id).unwrap(); + prio_graph.insert_transaction( + next_id, + Self::get_transaction_resource_access(transaction), + ); + } + + if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + break; + } + + // Should always be in the container, but can just skip if it is not for some reason. + let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else { + continue; + }; + + let transaction = &transaction_state.transaction_ttl().transaction; + + // Check if this transaction conflicts with any blocked transactions + if !blocking_locks.check_locks(transaction.message()) { + blocking_locks.take_locks(transaction.message()); + unschedulable_ids.push(id); + continue; + } + + let maybe_chain_thread = chain_id_to_thread_index + .get(&prio_graph.chain_id(&id)) + .copied(); + + // Schedule the transaction if it can be. + let transaction_locks = transaction.get_account_locks_unchecked(); + let Some(thread_id) = self.account_locks.try_lock_accounts( + transaction_locks.writable.into_iter(), + transaction_locks.readonly.into_iter(), + ThreadSet::any(num_threads), + |thread_set| { + Self::select_thread( + thread_set, + maybe_chain_thread, + &batches.transactions, + self.in_flight_tracker.num_in_flight_per_thread(), + ) + }, + ) else { + blocking_locks.take_locks(transaction.message()); + unschedulable_ids.push(id); + continue; + }; + + // Transaction is scheduable and is assigned to `thread_id`. + num_scheduled += 1; + + // Track the chain-id to thread-index mapping. + chain_id_to_thread_index.insert(prio_graph.chain_id(&id), thread_id); + + let sanitized_transaction_ttl = transaction_state.transition_to_pending(); + let cu_limit = transaction_state + .transaction_priority_details() + .compute_unit_limit; + + let SanitizedTransactionTTL { + transaction, + max_age_slot, + } = sanitized_transaction_ttl; + + batches.transactions[thread_id].push(transaction); + batches.ids[thread_id].push(id.id); + batches.max_age_slots[thread_id].push(max_age_slot); + batches.total_cus[thread_id] += cu_limit; + + // If target batch size is reached, send only this batch. + if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH { + num_scheduled += self.send_batch(&mut batches, thread_id)?; + } + } + + // Send all non-empty batches + num_scheduled += self.send_batches(&mut batches)?; + + // Unblock all transactions that were blocked by the transactions that were just sent. + for id in unblock_this_batch.drain(..) { + prio_graph.unblock(&id); + } + } + + // Send batches for any remaining transactions + num_scheduled += self.send_batches(&mut batches)?; + + // Push unschedulable ids back into the container + for id in unschedulable_ids { + container.push_id_into_queue(id); + } + + // Push remaining transactions back into the container + while let Some(id) = prio_graph.pop_and_unblock() { + container.push_id_into_queue(id); + } + + Ok(num_scheduled) } /// Mark a given `TransactionBatchId` as completed. @@ -62,37 +206,6 @@ impl PrioGraphScheduler { } } - /// Given the schedulable `thread_set`, select the thread with the least amount - /// of work queued up. - /// Currently, "work" is just defined as the number of transactions. - /// - /// If the `chain_thread` is available, this thread will be selected, regardless of - /// load-balancing. - /// - /// Panics if the `thread_set` is empty. - fn select_thread( - thread_set: ThreadSet, - chain_thread: ThreadId, - batches_per_thread: &[Vec], - in_flight_per_thread: &[usize], - ) -> ThreadId { - if thread_set.contains(chain_thread) { - chain_thread - } else { - thread_set - .contained_threads_iter() - .map(|thread_id| { - ( - thread_id, - batches_per_thread[thread_id].len() + in_flight_per_thread[thread_id], - ) - }) - .min_by(|a, b| a.1.cmp(&b.1)) - .map(|(thread_id, _)| thread_id) - .unwrap() - } - } - /// Send all batches of transactions to the worker threads. /// Returns the number of transactions sent. fn send_batches(&mut self, batches: &mut Batches) -> Result { @@ -131,6 +244,57 @@ impl PrioGraphScheduler { Ok(num_scheduled) } + + /// Given the schedulable `thread_set`, select the thread with the least amount + /// of work queued up. + /// Currently, "work" is just defined as the number of transactions. + /// + /// If the `chain_thread` is available, this thread will be selected, regardless of + /// load-balancing. + /// + /// Panics if the `thread_set` is empty. + fn select_thread( + thread_set: ThreadSet, + chain_thread: Option, + batches_per_thread: &[Vec], + in_flight_per_thread: &[usize], + ) -> ThreadId { + if let Some(chain_thread) = chain_thread { + if thread_set.contains(chain_thread) { + return chain_thread; + } + } + + thread_set + .contained_threads_iter() + .map(|thread_id| { + ( + thread_id, + batches_per_thread[thread_id].len() + in_flight_per_thread[thread_id], + ) + }) + .min_by(|a, b| a.1.cmp(&b.1)) + .map(|(thread_id, _)| thread_id) + .unwrap() + } + + /// Gets accessed resources for use in `PrioGraph`. + fn get_transaction_resource_access( + transaction: &SanitizedTransactionTTL, + ) -> impl Iterator + '_ { + let message = transaction.transaction.message(); + message + .account_keys() + .iter() + .enumerate() + .map(|(index, key)| { + if message.is_writable(index) { + (*key, AccessKind::Write) + } else { + (*key, AccessKind::Read) + } + }) + } } struct Batches { @@ -138,7 +302,6 @@ struct Batches { transactions: Vec>, max_age_slots: Vec>, total_cus: Vec, - locks: Vec, } impl Batches { @@ -148,9 +311,6 @@ impl Batches { transactions: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], max_age_slots: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], total_cus: vec![0; num_threads], - locks: std::iter::repeat_with(ReadWriteAccountSet::default) - .take(num_threads) - .collect(), } } @@ -163,7 +323,6 @@ impl Batches { Vec, u64, ) { - self.locks[thread_id].clear(); ( core::mem::replace( &mut self.ids[thread_id], diff --git a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs index 178a9cdf582d5f..74b7105bd1d1e5 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs @@ -1,4 +1,8 @@ -use crate::banking_stage::scheduler_messages::TransactionId; +use { + crate::banking_stage::scheduler_messages::TransactionId, + prio_graph::TopLevelId, + std::hash::{Hash, Hasher}, +}; /// A unique identifier tied with priority ordering for a transaction/packet: /// - `id` has no effect on ordering @@ -25,3 +29,15 @@ impl PartialOrd for TransactionPriorityId { Some(self.cmp(other)) } } + +impl Hash for TransactionPriorityId { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + +impl TopLevelId for TransactionPriorityId { + fn id(&self) -> Self { + *self + } +} From dfdc9a838f6f0ee5b9beb45e40fa92451cdf40e5 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 23:32:52 +0000 Subject: [PATCH 10/19] Add schedule tests --- .../prio_graph_scheduler.rs | 244 +++++++++++++++++- 1 file changed, 241 insertions(+), 3 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 42732ecee14282..9cab72a2eca369 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -136,9 +136,6 @@ impl PrioGraphScheduler { continue; }; - // Transaction is scheduable and is assigned to `thread_id`. - num_scheduled += 1; - // Track the chain-id to thread-index mapping. chain_id_to_thread_index.insert(prio_graph.chain_id(&id), thread_id); @@ -340,3 +337,244 @@ impl Batches { ) } } + +#[cfg(test)] +mod tests { + use { + super::*, + crate::banking_stage::consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + crossbeam_channel::{unbounded, Receiver}, + itertools::Itertools, + solana_runtime::transaction_priority_details::TransactionPriorityDetails, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, pubkey::Pubkey, + signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, + }, + std::borrow::Borrow, + }; + + macro_rules! txid { + ($value:expr) => { + TransactionId::new($value) + }; + } + + macro_rules! txids { + ([$($element:expr),*]) => { + vec![ $(txid!($element)),* ] + }; + } + + fn create_test_frame(num_threads: usize) -> (PrioGraphScheduler, Vec>) { + let (consume_work_senders, consume_work_receivers) = + (0..num_threads).map(|_| unbounded()).unzip(); + let scheduler = PrioGraphScheduler::new(consume_work_senders); + (scheduler, consume_work_receivers) + } + + fn prioritized_tranfers( + from_keypair: &Keypair, + to_pubkeys: impl IntoIterator>, + lamports: u64, + priority: u64, + ) -> SanitizedTransaction { + let to_pubkeys_lamports = to_pubkeys + .into_iter() + .map(|pubkey| *pubkey.borrow()) + .zip(std::iter::repeat(lamports)) + .collect_vec(); + let mut ixs = + system_instruction::transfer_many(&from_keypair.pubkey(), &to_pubkeys_lamports); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(priority); + ixs.push(prioritization); + let message = Message::new(&ixs, Some(&from_keypair.pubkey())); + let tx = Transaction::new(&[from_keypair], message, Hash::default()); + SanitizedTransaction::from_transaction_for_tests(tx) + } + + fn create_container( + tx_infos: impl IntoIterator< + Item = ( + impl Borrow, + impl IntoIterator>, + u64, + u64, + ), + >, + ) -> TransactionStateContainer { + let mut container = TransactionStateContainer::with_capacity(10 * 1024); + for (index, (from_keypair, to_pubkeys, lamports, priority)) in + tx_infos.into_iter().enumerate() + { + let id = TransactionId::new(index as u64); + let transaction = + prioritized_tranfers(from_keypair.borrow(), to_pubkeys, lamports, priority); + let transaction_ttl = SanitizedTransactionTTL { + transaction, + max_age_slot: Slot::MAX, + }; + container.insert_new_transaction( + id, + transaction_ttl, + TransactionPriorityDetails { + priority, + compute_unit_limit: 1, + }, + ); + } + + container + } + + fn collect_work( + receiver: &Receiver, + ) -> (Vec, Vec>) { + receiver + .try_iter() + .map(|work| { + let ids = work.ids.clone(); + (work, ids) + }) + .unzip() + } + + #[test] + fn test_schedule_disconnected_channel() { + let (mut scheduler, work_receivers) = create_test_frame(1); + let mut container = create_container([(&Keypair::new(), &[Pubkey::new_unique()], 1, 1)]); + + drop(work_receivers); // explicitly drop receivers + assert_matches!( + scheduler.schedule(&mut container), + Err(SchedulerError::DisconnectedSendChannel(_)) + ); + } + + #[test] + fn test_schedule_single_threaded_no_conflicts() { + let (mut scheduler, work_receivers) = create_test_frame(1); + let mut container = create_container([ + (&Keypair::new(), &[Pubkey::new_unique()], 1, 1), + (&Keypair::new(), &[Pubkey::new_unique()], 2, 2), + ]); + + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 2); + assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]); + } + + #[test] + fn test_schedule_single_threaded_conflict() { + let (mut scheduler, work_receivers) = create_test_frame(1); + let pubkey = Pubkey::new_unique(); + let mut container = create_container([ + (&Keypair::new(), &[pubkey], 1, 1), + (&Keypair::new(), &[pubkey], 1, 2), + ]); + + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 2); + assert_eq!( + collect_work(&work_receivers[0]).1, + vec![txids!([1]), txids!([0])] + ); + } + + #[test] + fn test_schedule_consume_single_threaded_multi_batch() { + let (mut scheduler, work_receivers) = create_test_frame(1); + let mut container = create_container( + (0..4 * TARGET_NUM_TRANSACTIONS_PER_BATCH) + .map(|i| (Keypair::new(), [Pubkey::new_unique()], i as u64, 1)), + ); + + // expect 4 full batches to be scheduled + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 4 * TARGET_NUM_TRANSACTIONS_PER_BATCH); + + let thread0_work_counts: Vec<_> = work_receivers[0] + .try_iter() + .map(|work| work.ids.len()) + .collect(); + assert_eq!(thread0_work_counts, [TARGET_NUM_TRANSACTIONS_PER_BATCH; 4]); + } + + #[test] + fn test_schedule_simple_thread_selection() { + let (mut scheduler, work_receivers) = create_test_frame(2); + let mut container = + create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i))); + + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 4); + assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]); + assert_eq!(collect_work(&work_receivers[1]).1, [txids!([2, 0])]); + } + + #[test] + fn test_schedule_look_ahead() { + let (mut scheduler, work_receivers) = create_test_frame(2); + + let accounts = (0..4).map(|_| Keypair::new()).collect_vec(); + let mut container = create_container([ + (&accounts[0], &[accounts[1].pubkey()], 1, 2), + (&accounts[2], &[accounts[3].pubkey()], 1, 1), + (&accounts[1], &[accounts[2].pubkey()], 1, 0), + ]); + + // high priority transactions [0, 1] do not conflict, and would be + // scheduled to *different* threads without chain-id look-ahead. + // Because low priority transaction [2] conflicts with both, it will + // cause transaction [1] to be scheduled onto the same thread as + // transaction [0]. + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 3); + assert_eq!( + collect_work(&work_receivers[0]).1, + [txids!([0, 1]), txids!([2])] + ); + } + + // #[test] + // fn test_schedule_priority_guard() { + // let (mut scheduler, work_receivers) = create_test_frame(2); + + // let accounts = (0..6).map(|_| Keypair::new()).collect_vec(); + // let mut container = create_container([ + // (&accounts[0], vec![accounts[1].pubkey()], 1, 3), + // (&accounts[2], vec![accounts[3].pubkey()], 1, 2), + // ( + // &accounts[1], + // vec![accounts[2].pubkey(), accounts[4].pubkey()], + // 1, + // 1, + // ), + // (&accounts[4], vec![accounts[5].pubkey()], 1, 0), + // ]); + + // // high priority transactions [0, 1] do not conflict, and would be + // // scheduled to *different* threads without chain-id look-ahead. + // // low priority transaction [2] conflicts with both, and thus will + // // cause transaction [1] to be scheduled onto the same thread as + // // transaction [0]. + // let num_scheduled = scheduler.schedule(&mut container).unwrap(); + // assert_eq!(num_scheduled, 2); + // let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); + // assert_eq!(thread_0_ids, [txids!([0])]); + // assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1])]); + + // // Cannot schedule even on next pass because of lock conflicts + // let num_scheduled = scheduler.schedule(&mut container).unwrap(); + // assert_eq!(num_scheduled, 0); + + // // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 + // scheduler.complete_batch(thread_0_work[0].batch_id, &thread_0_work[0].transactions); + // let num_scheduled = scheduler.schedule(&mut container).unwrap(); + // assert_eq!(num_scheduled, 2); + + // assert_eq!( + // collect_work(&work_receivers[1]).1, + // [txids!([2]), txids!([3])] + // ); + // } +} From 6443dc4a9ebab3f0a521ecdb8cd91bfb9a108747 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 23:51:37 +0000 Subject: [PATCH 11/19] Priority guard test --- .../prio_graph_scheduler.rs | 95 +++++++++---------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 9cab72a2eca369..160a6ae5b5ae56 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -22,6 +22,7 @@ pub(crate) struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>, + look_ahead_window_size: usize, } impl PrioGraphScheduler { @@ -31,6 +32,7 @@ impl PrioGraphScheduler { in_flight_tracker: InFlightTracker::new(num_threads), account_locks: ThreadAwareAccountLocks::new(num_threads), consume_work_senders, + look_ahead_window_size: 2048, } } @@ -54,16 +56,10 @@ impl PrioGraphScheduler { // these transactions to be scheduled before them. let mut unschedulable_ids = Vec::new(); let mut blocking_locks = ReadWriteAccountSet::default(); - - // Number of transactions to keep `active` in the `PrioGraph` during scheduling. - // This only needs to be large enough to give the scheduler a reasonable idea if - // a transaction will conflict with another upcoming transaction. - const LOOK_AHEAD_WINDOW: usize = 2048; - let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id); // Create the initial look-ahead window. - for _ in 0..LOOK_AHEAD_WINDOW { + for _ in 0..self.look_ahead_window_size { let Some(id) = container.pop() else { break; }; @@ -535,46 +531,47 @@ mod tests { ); } - // #[test] - // fn test_schedule_priority_guard() { - // let (mut scheduler, work_receivers) = create_test_frame(2); - - // let accounts = (0..6).map(|_| Keypair::new()).collect_vec(); - // let mut container = create_container([ - // (&accounts[0], vec![accounts[1].pubkey()], 1, 3), - // (&accounts[2], vec![accounts[3].pubkey()], 1, 2), - // ( - // &accounts[1], - // vec![accounts[2].pubkey(), accounts[4].pubkey()], - // 1, - // 1, - // ), - // (&accounts[4], vec![accounts[5].pubkey()], 1, 0), - // ]); - - // // high priority transactions [0, 1] do not conflict, and would be - // // scheduled to *different* threads without chain-id look-ahead. - // // low priority transaction [2] conflicts with both, and thus will - // // cause transaction [1] to be scheduled onto the same thread as - // // transaction [0]. - // let num_scheduled = scheduler.schedule(&mut container).unwrap(); - // assert_eq!(num_scheduled, 2); - // let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); - // assert_eq!(thread_0_ids, [txids!([0])]); - // assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1])]); - - // // Cannot schedule even on next pass because of lock conflicts - // let num_scheduled = scheduler.schedule(&mut container).unwrap(); - // assert_eq!(num_scheduled, 0); - - // // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 - // scheduler.complete_batch(thread_0_work[0].batch_id, &thread_0_work[0].transactions); - // let num_scheduled = scheduler.schedule(&mut container).unwrap(); - // assert_eq!(num_scheduled, 2); - - // assert_eq!( - // collect_work(&work_receivers[1]).1, - // [txids!([2]), txids!([3])] - // ); - // } + #[test] + fn test_schedule_priority_guard() { + let (mut scheduler, work_receivers) = create_test_frame(2); + // intentionally shorten the look-ahead window to cause unschedulable conflicts + scheduler.look_ahead_window_size = 2; + + let accounts = (0..8).map(|_| Keypair::new()).collect_vec(); + let mut container = create_container([ + (&accounts[0], &[accounts[1].pubkey()], 1, 6), + (&accounts[2], &[accounts[3].pubkey()], 1, 5), + (&accounts[4], &[accounts[5].pubkey()], 1, 4), + (&accounts[6], &[accounts[7].pubkey()], 1, 3), + (&accounts[1], &[accounts[2].pubkey()], 1, 2), + (&accounts[2], &[accounts[3].pubkey()], 1, 1), + ]); + + // high priority transactions [0, 1, 2, 3] do not conflict, and are + // scheduled onto threads in a round-robin fashion. + // The look-ahead window is intentionally shortened, which leads to + // transaction [4] being unschedulable due to conflicts with [0] and [1], + // which were scheduled to different threads. + // Transaction [5] is technically schedulable, but will not be scheduled + // because it conflicts with [4]. + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 4); + let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); + assert_eq!(thread_0_ids, [txids!([0, 2])]); + assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1, 3])]); + + // Cannot schedule even on next pass because of lock conflicts + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 0); + + // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 + scheduler.complete_batch(thread_0_work[0].batch_id, &thread_0_work[0].transactions); + let num_scheduled = scheduler.schedule(&mut container).unwrap(); + assert_eq!(num_scheduled, 2); + + assert_eq!( + collect_work(&work_receivers[1]).1, + [txids!([4]), txids!([5])] + ); + } } From 9e704c944e7908d34503ff4a12451fccd6b13908 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 9 Oct 2023 23:59:40 +0000 Subject: [PATCH 12/19] programs cargo lock --- programs/sbf/Cargo.lock | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 25c3c36583bf4a..0001f38c42de88 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -3459,6 +3459,12 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "prio-graph" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78dd2fa9ca0901b4d0dbf51d9862d7e3fb004605e4f4b4132472c3d08e7d901b" + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -4790,6 +4796,7 @@ dependencies = [ "lru", "min-max-heap", "num_enum 0.7.0", + "prio-graph", "quinn", "rand 0.8.5", "rand_chacha 0.3.1", From 0042813b5fbd04e865e8d60bf0009b80440df4e1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 19 Oct 2023 05:39:02 +0000 Subject: [PATCH 13/19] PrioGraph owns Receiver --- .../prio_graph_scheduler.rs | 105 +++++++++++++++--- .../transaction_scheduler/scheduler_error.rs | 2 + 2 files changed, 93 insertions(+), 14 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 160a6ae5b5ae56..219649ac4bc8f9 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -9,10 +9,11 @@ use { crate::banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, read_write_account_set::ReadWriteAccountSet, - scheduler_messages::{ConsumeWork, TransactionBatchId, TransactionId}, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId}, transaction_scheduler::transaction_priority_id::TransactionPriorityId, }, - crossbeam_channel::Sender, + crossbeam_channel::{Receiver, Sender, TryRecvError}, + itertools::izip, prio_graph::{AccessKind, PrioGraph}, solana_sdk::{pubkey::Pubkey, slot_history::Slot, transaction::SanitizedTransaction}, std::collections::HashMap, @@ -22,16 +23,21 @@ pub(crate) struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>, + finished_consume_work_receiver: Receiver, look_ahead_window_size: usize, } impl PrioGraphScheduler { - pub(crate) fn new(consume_work_senders: Vec>) -> Self { + pub(crate) fn new( + consume_work_senders: Vec>, + finished_consume_work_receiver: Receiver, + ) -> Self { let num_threads = consume_work_senders.len(); Self { in_flight_tracker: InFlightTracker::new(num_threads), account_locks: ThreadAwareAccountLocks::new(num_threads), consume_work_senders, + finished_consume_work_receiver, look_ahead_window_size: 2048, } } @@ -181,9 +187,68 @@ impl PrioGraphScheduler { Ok(num_scheduled) } + /// Receive completed batches of transactions without blocking. + pub fn receive_completed( + &mut self, + container: &mut TransactionStateContainer, + ) -> Result<(), SchedulerError> { + while self.try_receive_completed(container)? {} + Ok(()) + } + + /// Receive completed batches of transactions. + /// Returns `Ok(true)` if a batch was received, `Ok(false)` if no batch was received. + fn try_receive_completed( + &mut self, + container: &mut TransactionStateContainer, + ) -> Result { + match self.finished_consume_work_receiver.try_recv() { + Ok(FinishedConsumeWork { + work: + ConsumeWork { + batch_id, + ids, + transactions, + max_age_slots, + }, + retryable_indexes, + }) => { + // Free the locks + self.complete_batch(batch_id, &transactions); + + // Retryable transactions should be inserted back into the container + let mut retryable_iter = retryable_indexes.into_iter().peekable(); + for (index, (id, transaction, max_age_slot)) in + izip!(ids, transactions, max_age_slots).enumerate() + { + if let Some(retryable_index) = retryable_iter.peek() { + if *retryable_index == index { + container.retry_transaction( + id, + SanitizedTransactionTTL { + transaction, + max_age_slot, + }, + ); + retryable_iter.next(); + continue; + } + } + container.remove_by_id(&id); + } + + Ok(true) + } + Err(TryRecvError::Empty) => Ok(false), + Err(TryRecvError::Disconnected) => Err(SchedulerError::DisconnectedRecvChannel( + "finished consume work", + )), + } + } + /// Mark a given `TransactionBatchId` as completed. /// This will update the internal tracking, including account locks. - pub(crate) fn complete_batch( + fn complete_batch( &mut self, batch_id: TransactionBatchId, transactions: &[SanitizedTransaction], @@ -361,11 +426,23 @@ mod tests { }; } - fn create_test_frame(num_threads: usize) -> (PrioGraphScheduler, Vec>) { + fn create_test_frame( + num_threads: usize, + ) -> ( + PrioGraphScheduler, + Vec>, + Sender, + ) { let (consume_work_senders, consume_work_receivers) = (0..num_threads).map(|_| unbounded()).unzip(); - let scheduler = PrioGraphScheduler::new(consume_work_senders); - (scheduler, consume_work_receivers) + let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + let scheduler = + PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver); + ( + scheduler, + consume_work_receivers, + finished_consume_work_sender, + ) } fn prioritized_tranfers( @@ -436,7 +513,7 @@ mod tests { #[test] fn test_schedule_disconnected_channel() { - let (mut scheduler, work_receivers) = create_test_frame(1); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); let mut container = create_container([(&Keypair::new(), &[Pubkey::new_unique()], 1, 1)]); drop(work_receivers); // explicitly drop receivers @@ -448,7 +525,7 @@ mod tests { #[test] fn test_schedule_single_threaded_no_conflicts() { - let (mut scheduler, work_receivers) = create_test_frame(1); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); let mut container = create_container([ (&Keypair::new(), &[Pubkey::new_unique()], 1, 1), (&Keypair::new(), &[Pubkey::new_unique()], 2, 2), @@ -461,7 +538,7 @@ mod tests { #[test] fn test_schedule_single_threaded_conflict() { - let (mut scheduler, work_receivers) = create_test_frame(1); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); let pubkey = Pubkey::new_unique(); let mut container = create_container([ (&Keypair::new(), &[pubkey], 1, 1), @@ -478,7 +555,7 @@ mod tests { #[test] fn test_schedule_consume_single_threaded_multi_batch() { - let (mut scheduler, work_receivers) = create_test_frame(1); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); let mut container = create_container( (0..4 * TARGET_NUM_TRANSACTIONS_PER_BATCH) .map(|i| (Keypair::new(), [Pubkey::new_unique()], i as u64, 1)), @@ -497,7 +574,7 @@ mod tests { #[test] fn test_schedule_simple_thread_selection() { - let (mut scheduler, work_receivers) = create_test_frame(2); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(2); let mut container = create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i))); @@ -509,7 +586,7 @@ mod tests { #[test] fn test_schedule_look_ahead() { - let (mut scheduler, work_receivers) = create_test_frame(2); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(2); let accounts = (0..4).map(|_| Keypair::new()).collect_vec(); let mut container = create_container([ @@ -533,7 +610,7 @@ mod tests { #[test] fn test_schedule_priority_guard() { - let (mut scheduler, work_receivers) = create_test_frame(2); + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(2); // intentionally shorten the look-ahead window to cause unschedulable conflicts scheduler.look_ahead_window_size = 2; diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs index 50715a19e4b7ef..9b8d4015448e57 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_error.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_error.rs @@ -4,4 +4,6 @@ use thiserror::Error; pub enum SchedulerError { #[error("Sending channel disconnected: {0}")] DisconnectedSendChannel(&'static str), + #[error("Recv channel disconnected: {0}")] + DisconnectedRecvChannel(&'static str), } From 4bc39132176fdaeeca44125814a286720450295e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 19 Oct 2023 05:45:46 +0000 Subject: [PATCH 14/19] Convert priority_guard test to use receive_completed --- .../transaction_scheduler/prio_graph_scheduler.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 219649ac4bc8f9..d41aff506042bd 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -610,7 +610,7 @@ mod tests { #[test] fn test_schedule_priority_guard() { - let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(2); + let (mut scheduler, work_receivers, finished_work_sender) = create_test_frame(2); // intentionally shorten the look-ahead window to cause unschedulable conflicts scheduler.look_ahead_window_size = 2; @@ -642,7 +642,13 @@ mod tests { assert_eq!(num_scheduled, 0); // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 - scheduler.complete_batch(thread_0_work[0].batch_id, &thread_0_work[0].transactions); + finished_work_sender + .send(FinishedConsumeWork { + work: thread_0_work.into_iter().next().unwrap(), + retryable_indexes: vec![], + }) + .unwrap(); + scheduler.receive_completed(&mut container).unwrap(); let num_scheduled = scheduler.schedule(&mut container).unwrap(); assert_eq!(num_scheduled, 2); From 40f292c69954f8f516c755c8ad3e051026ecfa21 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 20 Oct 2023 01:21:05 +0000 Subject: [PATCH 15/19] Updated test comment --- .../transaction_scheduler/prio_graph_scheduler.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index d41aff506042bd..de0f945ac60351 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -624,13 +624,13 @@ mod tests { (&accounts[2], &[accounts[3].pubkey()], 1, 1), ]); - // high priority transactions [0, 1, 2, 3] do not conflict, and are - // scheduled onto threads in a round-robin fashion. - // The look-ahead window is intentionally shortened, which leads to - // transaction [4] being unschedulable due to conflicts with [0] and [1], - // which were scheduled to different threads. - // Transaction [5] is technically schedulable, but will not be scheduled - // because it conflicts with [4]. + // The look-ahead window is intentionally shortened, high priority transactions + // [0, 1, 2, 3] do not conflict, and are scheduled onto threads in a + // round-robin fashion. This leads to transaction [4] being unschedulable due + // to conflicts with [0] and [1], which were scheduled to different threads. + // Transaction [5] is technically schedulable, onto thread 1 since it only + // conflicts with transaction [1]. However, [5] will not be scheduled because + // it conflicts with a higher-priority transaction [4] that is unschedulable. let num_scheduled = scheduler.schedule(&mut container).unwrap(); assert_eq!(num_scheduled, 4); let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); From e40749aa9aa0c2d6d6e1a3d9084be68ef5523c48 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 20 Oct 2023 01:35:35 +0000 Subject: [PATCH 16/19] Expand test case and add clarifying comments --- .../prio_graph_scheduler.rs | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index de0f945ac60351..941d91fa8fd0fa 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -588,23 +588,32 @@ mod tests { fn test_schedule_look_ahead() { let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(2); - let accounts = (0..4).map(|_| Keypair::new()).collect_vec(); + let accounts = (0..6).map(|_| Keypair::new()).collect_vec(); let mut container = create_container([ - (&accounts[0], &[accounts[1].pubkey()], 1, 2), - (&accounts[2], &[accounts[3].pubkey()], 1, 1), - (&accounts[1], &[accounts[2].pubkey()], 1, 0), + (&accounts[0], &[accounts[1].pubkey()], 1, 4), + (&accounts[1], &[accounts[2].pubkey()], 1, 3), + (&accounts[3], &[accounts[4].pubkey()], 1, 2), + (&accounts[4], &[accounts[5].pubkey()], 1, 1), + (&accounts[2], &[accounts[5].pubkey()], 1, 0), ]); - // high priority transactions [0, 1] do not conflict, and would be - // scheduled to *different* threads without chain-id look-ahead. - // Because low priority transaction [2] conflicts with both, it will - // cause transaction [1] to be scheduled onto the same thread as - // transaction [0]. + // The look-ahead window allows the prio-graph to have a limited view of + // upcoming transactions, so that un-schedulable transactions are less + // likely to occur. In this case, we have 5 transactions that have a + // prio-graph that can be visualized as: + // [0] --> [1] \ + // -> [4] + // / + // [2] --> [3] + // Even though [0] and [2] could be scheduled to different threads, the + // fact they eventually join means that the scheduler will schedule them + // onto the same thread to avoid causing [4], which conflicts with both + // chains, to be un-schedulable. let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 3); + assert_eq!(num_scheduled, 5); assert_eq!( collect_work(&work_receivers[0]).1, - [txids!([0, 1]), txids!([2])] + [txids!([0, 2]), txids!([1, 3]), txids!([4])] ); } @@ -631,6 +640,15 @@ mod tests { // Transaction [5] is technically schedulable, onto thread 1 since it only // conflicts with transaction [1]. However, [5] will not be scheduled because // it conflicts with a higher-priority transaction [4] that is unschedulable. + // The full prio-graph can be visualized as: + // [0] \ + // -> [4] -> [5] + // [1] / ------/ + // [2] + // [3] + // Because the look-ahead window is shortened to a size of 4, the scheduler does + // not have knowledge of the joining at transaction [4] until after [0] and [1] + // have been scheduled. let num_scheduled = scheduler.schedule(&mut container).unwrap(); assert_eq!(num_scheduled, 4); let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); From 1935566ebf3eb117e2675c2577a763679ab45338 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 20 Oct 2023 01:36:03 +0000 Subject: [PATCH 17/19] get_transaction_account_access --- .../transaction_scheduler/prio_graph_scheduler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 941d91fa8fd0fa..d0830aa480af05 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -71,7 +71,7 @@ impl PrioGraphScheduler { }; let transaction = container.get_transaction_ttl(&id.id).unwrap(); - prio_graph.insert_transaction(id, Self::get_transaction_resource_access(transaction)); + prio_graph.insert_transaction(id, Self::get_transaction_account_access(transaction)); } let mut unblock_this_batch = @@ -92,7 +92,7 @@ impl PrioGraphScheduler { let transaction = container.get_transaction_ttl(&next_id.id).unwrap(); prio_graph.insert_transaction( next_id, - Self::get_transaction_resource_access(transaction), + Self::get_transaction_account_access(transaction), ); } @@ -336,8 +336,8 @@ impl PrioGraphScheduler { .unwrap() } - /// Gets accessed resources for use in `PrioGraph`. - fn get_transaction_resource_access( + /// Gets accessed accounts (resources) for use in `PrioGraph`. + fn get_transaction_account_access( transaction: &SanitizedTransactionTTL, ) -> impl Iterator + '_ { let message = transaction.transaction.message(); From c0fbeba29e953bfaf184fa1c2be7da3843cbaa21 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 20 Oct 2023 01:59:16 +0000 Subject: [PATCH 18/19] num_scheduled check move --- .../prio_graph_scheduler.rs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index d0830aa480af05..ee76f20d777ebd 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -78,6 +78,7 @@ impl PrioGraphScheduler { Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000; let mut num_scheduled = 0; + let mut num_sent = 0; while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. if prio_graph.is_empty() { @@ -96,10 +97,6 @@ impl PrioGraphScheduler { ); } - if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { - break; - } - // Should always be in the container, but can just skip if it is not for some reason. let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else { continue; @@ -138,6 +135,8 @@ impl PrioGraphScheduler { continue; }; + num_scheduled += 1; + // Track the chain-id to thread-index mapping. chain_id_to_thread_index.insert(prio_graph.chain_id(&id), thread_id); @@ -158,12 +157,16 @@ impl PrioGraphScheduler { // If target batch size is reached, send only this batch. if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH { - num_scheduled += self.send_batch(&mut batches, thread_id)?; + num_sent += self.send_batch(&mut batches, thread_id)?; + } + + if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + break; } } // Send all non-empty batches - num_scheduled += self.send_batches(&mut batches)?; + num_sent += self.send_batches(&mut batches)?; // Unblock all transactions that were blocked by the transactions that were just sent. for id in unblock_this_batch.drain(..) { @@ -172,7 +175,7 @@ impl PrioGraphScheduler { } // Send batches for any remaining transactions - num_scheduled += self.send_batches(&mut batches)?; + num_sent += self.send_batches(&mut batches)?; // Push unschedulable ids back into the container for id in unschedulable_ids { @@ -184,6 +187,11 @@ impl PrioGraphScheduler { container.push_id_into_queue(id); } + assert_eq!( + num_scheduled, num_sent, + "number of scheduled and sent transactions must match" + ); + Ok(num_scheduled) } From 96c77517512ef19f2596a17b325bfc2b814889b3 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 20 Oct 2023 02:19:40 +0000 Subject: [PATCH 19/19] panic on missing transaction state --- .../transaction_scheduler/prio_graph_scheduler.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index ee76f20d777ebd..23e15562e1ae54 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -97,9 +97,10 @@ impl PrioGraphScheduler { ); } - // Should always be in the container, but can just skip if it is not for some reason. + // Should always be in the container, during initial testing phase panic. + // Later, we can replace with a continue in case this does happen. let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else { - continue; + panic!("transaction state must exist") }; let transaction = &transaction_state.transaction_ttl().transaction;