Skip to content

Commit

Permalink
Add InstalledScheduler for blockstore_processor (solana-labs#33875)
Browse files Browse the repository at this point in the history
* Add InstalledScheduler for blockstore_processor

* Reverse if clauses

* Add more comments for process_batches()

* Elaborate comment

* Simplify schedule_transaction_executions type
  • Loading branch information
ryoqun authored Oct 27, 2023
1 parent d04ad65 commit 950ca5e
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 41 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ memmap2 = "0.5.10"
memoffset = "0.9"
merlin = "3"
min-max-heap = "1.3.0"
mockall = "0.11.4"
modular-bitfield = "0.11.2"
nix = "0.26.4"
num-bigint = "0.4.4"
Expand Down
180 changes: 146 additions & 34 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,70 @@ fn execute_batches_internal(
})
}

// This fn diverts the code-path into two variants. Both must provide exactly the same set of
// validations. For this reason, this fn is deliberately inserted into the code path to be called
// inside process_entries(), so that Bank::prepare_sanitized_batch() has been called on all of
// batches already, while minimizing code duplication (thus divergent behavior risk) at the cost of
// acceptable overhead of meaningless buffering of batches for the scheduler variant.
//
// Also note that the scheduler variant can't implement the batch-level sanitization naively, due
// to the nature of individual tx processing. That's another reason of this particular placement of
// divergent point in the code-path (i.e. not one layer up with its own prepare_sanitized_batch()
// invocation).
fn process_batches(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
batch_execution_timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
if bank.has_installed_scheduler() {
debug!(
"process_batches()/schedule_batches_for_execution({} batches)",
batches.len()
);
// scheduling always succeeds here without being blocked on actual transaction executions.
// The transaction execution errors will be collected via the blocking fn called
// BankWithScheduler::wait_for_completed_scheduler(), if any.
schedule_batches_for_execution(bank, batches);
Ok(())
} else {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
batches.len()
);
rebatch_and_execute_batches(
bank,
batches,
transaction_status_sender,
replay_vote_sender,
batch_execution_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)
}
}

fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
) {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
} in batches
{
bank.schedule_transaction_executions(
batch
.sanitized_transactions()
.iter()
.zip(transaction_indexes.iter()),
);
}
}

fn rebatch_transactions<'a>(
lock_results: &'a [Result<()>],
bank: &'a Arc<Bank>,
Expand All @@ -314,7 +378,7 @@ fn rebatch_transactions<'a>(
}
}

fn execute_batches(
fn rebatch_and_execute_batches(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
Expand Down Expand Up @@ -488,7 +552,7 @@ fn process_entries(
if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) {
// If it's a tick that will cause a new blockhash to be created,
// execute the group and register the tick
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand Down Expand Up @@ -541,7 +605,7 @@ fn process_entries(
} else {
// else we have an entry that conflicts with a prior entry
// execute the current queue and try to process this entry again
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand All @@ -556,7 +620,7 @@ fn process_entries(
}
}
}
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand Down Expand Up @@ -1856,8 +1920,11 @@ pub mod tests {
rand::{thread_rng, Rng},
solana_entry::entry::{create_ticks, next_entry, next_entry_mut},
solana_program_runtime::declare_process_instruction,
solana_runtime::genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
solana_runtime::{
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
Expand Down Expand Up @@ -4245,6 +4312,38 @@ pub mod tests {
)
}

fn create_test_transactions(
mint_keypair: &Keypair,
genesis_hash: &Hash,
) -> Vec<SanitizedTransaction> {
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();

vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
mint_keypair,
&pubkey,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
*genesis_hash,
)),
]
}

#[test]
fn test_confirm_slot_entries_progress_num_txs_indexes() {
let GenesisConfigInfo {
Expand Down Expand Up @@ -4368,34 +4467,7 @@ pub mod tests {
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));

let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();

let txs = vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
genesis_config.hash(),
)),
];

let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let batch = bank.prepare_sanitized_batch(&txs);
assert!(batch.needs_unlock());
let transaction_indexes = vec![42, 43, 44];
Expand Down Expand Up @@ -4424,6 +4496,46 @@ pub mod tests {
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
}

#[test]
fn test_schedule_batches_for_execution() {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));

let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());

let mut mocked_scheduler = MockInstalledScheduler::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));

let batch = bank.prepare_sanitized_batch(&txs);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
transaction_indexes: (0..txs.len()).collect(),
};

let mut batch_execution_timing = BatchExecutionTiming::default();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
assert!(process_batches(
&bank,
&[batch_with_indexes],
None,
None,
&mut batch_execution_timing,
None,
&ignored_prioritization_fee_cache
)
.is_ok());
}

#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;
Expand Down
Loading

0 comments on commit 950ca5e

Please sign in to comment.