Skip to content

Commit

Permalink
PrioGraphSchedulerConfig (#4064)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Dec 13, 2024
1 parent 4afbc66 commit 29d4b57
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 26 deletions.
7 changes: 6 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use {
time::{Duration, Instant},
},
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
transaction_state_container::TransactionStateContainer,
},
Expand Down Expand Up @@ -618,7 +619,11 @@ impl BankingStage {
bank_forks.clone(),
forwarder.is_some(),
);
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler = PrioGraphScheduler::new(
work_senders,
finished_work_receiver,
PrioGraphSchedulerConfig::default(),
);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
receive_and_buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,47 @@ type SchedulerPrioGraph = PrioGraph<
fn(&TransactionPriorityId, &GraphNode<TransactionPriorityId>) -> TransactionPriorityId,
>;

pub(crate) struct PrioGraphSchedulerConfig {
pub max_scheduled_cus: u64,
pub max_transactions_per_scheduling_pass: usize,
pub look_ahead_window_size: usize,
pub target_transactions_per_batch: usize,
}

impl Default for PrioGraphSchedulerConfig {
fn default() -> Self {
Self {
max_scheduled_cus: MAX_BLOCK_UNITS,
max_transactions_per_scheduling_pass: 100_000,
look_ahead_window_size: 2048,
target_transactions_per_batch: TARGET_NUM_TRANSACTIONS_PER_BATCH,
}
}
}

pub(crate) struct PrioGraphScheduler<Tx> {
in_flight_tracker: InFlightTracker,
account_locks: ThreadAwareAccountLocks,
consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
look_ahead_window_size: usize,
prio_graph: SchedulerPrioGraph,
config: PrioGraphSchedulerConfig,
}

impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
pub(crate) fn new(
consume_work_senders: Vec<Sender<ConsumeWork<Tx>>>,
finished_consume_work_receiver: Receiver<FinishedConsumeWork<Tx>>,
config: PrioGraphSchedulerConfig,
) -> Self {
let num_threads = consume_work_senders.len();
Self {
in_flight_tracker: InFlightTracker::new(num_threads),
account_locks: ThreadAwareAccountLocks::new(num_threads),
consume_work_senders,
finished_consume_work_receiver,
look_ahead_window_size: 2048,
prio_graph: PrioGraph::new(passthrough_priority),
config,
}
}

Expand All @@ -89,7 +108,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
pre_lock_filter: impl Fn(&Tx) -> bool,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let max_cu_per_thread = MAX_BLOCK_UNITS / num_threads as u64;
let max_cu_per_thread = self.config.max_scheduled_cus / num_threads as u64;

let mut schedulable_threads = ThreadSet::any(num_threads);
for thread_id in 0..num_threads {
Expand All @@ -106,7 +125,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
});
}

let mut batches = Batches::new(num_threads);
let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch);
// Some transactions may be unschedulable due to multi-thread conflicts.
// These transactions cannot be scheduled until some conflicting work is completed.
// However, the scheduler should not allow other transactions that conflict with
Expand All @@ -118,7 +137,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
let mut num_filtered_out: usize = 0;
let mut total_filter_time_us: u64 = 0;

let mut window_budget = self.look_ahead_window_size;
let mut window_budget = self.config.look_ahead_window_size;
let mut chunked_pops = |container: &mut S,
prio_graph: &mut PrioGraph<_, _, _, _>,
window_budget: &mut usize| {
Expand Down Expand Up @@ -170,13 +189,13 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
// Check transactions against filter, remove from container if it fails.
chunked_pops(container, &mut self.prio_graph, &mut window_budget);

let mut unblock_this_batch =
Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH);
const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000;
let mut unblock_this_batch = Vec::with_capacity(
self.consume_work_senders.len() * self.config.target_transactions_per_batch,
);
let mut num_scheduled: usize = 0;
let mut num_sent: usize = 0;
let mut num_unschedulable: usize = 0;
while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
while num_scheduled < self.config.max_transactions_per_scheduling_pass {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if self.prio_graph.is_empty() {
break;
Expand Down Expand Up @@ -229,7 +248,8 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
saturating_add_assign!(batches.total_cus[thread_id], cost);

// If target batch size is reached, send only this batch.
if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH {
if batches.ids[thread_id].len() >= self.config.target_transactions_per_batch
{
saturating_add_assign!(
num_sent,
self.send_batch(&mut batches, thread_id)?
Expand All @@ -248,7 +268,7 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
}
}

if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
if num_scheduled >= self.config.max_transactions_per_scheduling_pass {
break;
}
}
Expand Down Expand Up @@ -408,7 +428,8 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
return Ok(0);
}

let (ids, transactions, max_ages, total_cus) = batches.take_batch(thread_index);
let (ids, transactions, max_ages, total_cus) =
batches.take_batch(thread_index, self.config.target_transactions_per_batch);

let batch_id = self
.in_flight_tracker
Expand Down Expand Up @@ -498,34 +519,35 @@ struct Batches<Tx> {
}

impl<Tx> Batches<Tx> {
fn new(num_threads: usize) -> Self {
fn new(num_threads: usize, target_num_transactions_per_batch: usize) -> Self {
Self {
ids: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads],
ids: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads],

transactions: (0..num_threads)
.map(|_| Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH))
.map(|_| Vec::with_capacity(target_num_transactions_per_batch))
.collect(),
max_ages: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads],
max_ages: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads],
total_cus: vec![0; num_threads],
}
}

fn take_batch(
&mut self,
thread_id: ThreadId,
target_num_transactions_per_batch: usize,
) -> (Vec<TransactionId>, Vec<Tx>, Vec<MaxAge>, u64) {
(
core::mem::replace(
&mut self.ids[thread_id],
Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH),
Vec::with_capacity(target_num_transactions_per_batch),
),
core::mem::replace(
&mut self.transactions[thread_id],
Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH),
Vec::with_capacity(target_num_transactions_per_batch),
),
core::mem::replace(
&mut self.max_ages[thread_id],
Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH),
Vec::with_capacity(target_num_transactions_per_batch),
),
core::mem::replace(&mut self.total_cus[thread_id], 0),
)
Expand Down Expand Up @@ -605,7 +627,6 @@ mod tests {
use {
super::*,
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
immutable_deserialized_packet::ImmutableDeserializedPacket,
transaction_scheduler::transaction_state_container::TransactionStateContainer,
},
Expand Down Expand Up @@ -637,8 +658,11 @@ mod tests {
let (consume_work_senders, consume_work_receivers) =
(0..num_threads).map(|_| unbounded()).unzip();
let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded();
let scheduler =
PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver);
let scheduler = PrioGraphScheduler::new(
consume_work_senders,
finished_consume_work_receiver,
PrioGraphSchedulerConfig::default(),
);
(
scheduler,
consume_work_receivers,
Expand Down Expand Up @@ -821,7 +845,7 @@ mod tests {
fn test_schedule_priority_guard() {
let (mut scheduler, work_receivers, finished_work_sender) = create_test_frame(2);
// intentionally shorten the look-ahead window to cause unschedulable conflicts
scheduler.look_ahead_window_size = 2;
scheduler.config.look_ahead_window_size = 2;

let accounts = (0..8).map(|_| Keypair::new()).collect_vec();
let mut container = create_container([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,10 @@ mod tests {
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
},
},
banking_trace::BankingPacketBatch,
},
Expand Down Expand Up @@ -549,11 +552,16 @@ mod tests {
false,
);

let scheduler = PrioGraphScheduler::new(
consume_work_senders,
finished_consume_work_receiver,
PrioGraphSchedulerConfig::default(),
);
let scheduler_controller = SchedulerController::new(
decision_maker,
receive_and_buffer,
bank_forks,
PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver),
scheduler,
vec![], // no actual workers with metrics to report, this can be empty
None,
);
Expand Down

0 comments on commit 29d4b57

Please sign in to comment.