diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index d17be49ce6744c..46b3d0a47a9fd0 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -4,7 +4,7 @@ use { timings::ExecuteDetailsTimings, }, itertools::Itertools, - log::{debug, error, log_enabled, trace}, + log::{debug, log_enabled, trace}, percentage::PercentageInteger, solana_measure::measure::Measure, solana_rbpf::{ @@ -658,14 +658,14 @@ impl LoadedPrograms { /// Before rerooting the blockstore this removes all superfluous entries pub fn prune(&mut self, new_root_slot: Slot, new_root_epoch: Epoch) { - let Some(fork_graph) = self.fork_graph.clone() else { - error!("Program cache doesn't have fork graph."); - return; - }; - let Ok(fork_graph) = fork_graph.read() else { - error!("Failed to lock fork graph for reading."); - return; - }; + let fork_graph = self + .fork_graph + .as_ref() + .cloned() + .expect("Program cache doesn't have fork graph."); + let fork_graph = fork_graph + .read() + .expect("Failed to lock fork graph for reading."); for second_level in self.entries.values_mut() { // Remove entries un/re/deployed on orphan forks let mut first_ancestor_found = false; @@ -873,6 +873,81 @@ impl LoadedPrograms { extracted } + /// In cooperative loading a TX batch calls this to receive the next task + pub fn next_cooperative_loading_task( + &mut self, + extracted: &Arc>, + ) -> Option<(Pubkey, Arc, bool)> { + // The Mutexes are strictly speaking unnecessary + // because the entire `LoadedPrograms` cache is already locked here. + let extracted = extracted.lock().unwrap(); + let (key, (entry, reload)) = + extracted.loading.iter().find(|(_key, (entry, _reload))| { + let LoadedProgramType::Loading(mutex) = &entry.program else { + debug_assert!(false); + return false; + }; + let processing = mutex.lock().unwrap().0; + !processing + })?; + let (key, entry, reload) = (*key, entry.clone(), *reload); + drop(extracted); + { + let LoadedProgramType::Loading(mutex) = &entry.program else { + debug_assert!(false); + return None; + }; + let processing = &mut mutex.lock().unwrap().0; + *processing = true; + } + Some((key, entry, reload)) + } + + /// Upon completing a task in cooperative loading a TX batch calls this to submit the result + pub fn cooperative_loading_task_complete( + &mut self, + key: Pubkey, + loading: Arc, + loaded: Arc, + ) { + let LoadedProgramType::Loading(mutex) = &loading.program else { + debug_assert!(false); + return; + }; + let mut mutex = mutex.lock().unwrap(); + let processing = &mut mutex.0; + *processing = false; + let waiting_list_is_empty = { + let fork_graph = self + .fork_graph + .as_ref() + .expect("Program cache doesn't have fork graph."); + let fork_graph = fork_graph + .read() + .expect("Failed to lock fork graph for reading."); + let waiting_list = &mut mutex.1; + waiting_list.retain(|waiting| { + // The Mutex around `waiting` is strictly speaking unnecessary + // because the entire `LoadedPrograms` cache is already locked here. + let mut waiting = waiting.lock().unwrap(); + let relation = fork_graph.relationship(loaded.deployment_slot, waiting.loaded.slot); + if loaded.deployment_slot <= self.latest_root_slot + || matches!(relation, BlockRelation::Equal | BlockRelation::Descendant) + { + waiting.loading.remove(&key); + waiting.loaded.assign_program(key, loaded.clone()); + return false; + } + true + }); + waiting_list.is_empty() + }; + if waiting_list_is_empty { + self.remove_program(key, &loading); + } + self.assign_program(key, loaded); + } + pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) { tx_batch_cache.entries.iter().for_each(|(key, entry)| { self.assign_program(*key, entry.clone()); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 5c7a89bc3a6e5e..e9aa2024d285d4 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -193,7 +193,7 @@ use { sync::{ atomic::{ AtomicBool, AtomicI64, AtomicU64, AtomicUsize, - Ordering::{self, AcqRel, Acquire, Relaxed}, + Ordering::{AcqRel, Acquire, Relaxed}, }, Arc, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard, }, @@ -277,7 +277,6 @@ pub struct BankRc { #[cfg(RUSTC_WITH_SPECIALIZATION)] use solana_frozen_abi::abi_example::AbiExample; -use solana_program_runtime::loaded_programs::ExtractedPrograms; #[cfg(RUSTC_WITH_SPECIALIZATION)] impl AbiExample for BankRc { @@ -5030,50 +5029,43 @@ impl Bank { .collect() }; - let ExtractedPrograms { - loaded: mut loaded_programs_for_txs, - missing, - unloaded, - } = { + let extracted = { // Lock the global cache to figure out which programs need to be loaded - let loaded_programs_cache = self.loaded_programs_cache.read().unwrap(); + let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); loaded_programs_cache.extract(self, programs_and_slots.into_iter()) }; - // Load missing programs while global cache is unlocked - let missing_programs: Vec<(Pubkey, Arc)> = missing - .iter() - .map(|(key, count)| { - let program = self.load_program(key, false); - program.tx_usage_counter.store(*count, Ordering::Relaxed); - (*key, program) - }) - .collect(); - - // Reload unloaded programs while global cache is unlocked - let unloaded_programs: Vec<(Pubkey, Arc)> = unloaded - .iter() - .map(|(key, count)| { - let program = self.load_program(key, true); - program.tx_usage_counter.store(*count, Ordering::Relaxed); - (*key, program) - }) - .collect(); - - // Lock the global cache again to replenish the missing programs - let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); - for (key, program) in missing_programs { - let entry = loaded_programs_cache.assign_program(key, program); - // Use the returned entry as that might have been deduplicated globally - loaded_programs_for_txs.assign_program(key, entry); - } - for (key, program) in unloaded_programs { - let entry = loaded_programs_cache.assign_program(key, program); - // Use the returned entry as that might have been deduplicated globally - loaded_programs_for_txs.assign_program(key, entry); + // Cooperative loading phase + let mut finished_task = None; + loop { + // Critical section for global coordination + let (key, loading, reload) = { + let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); + if let Some((key, loading, loaded)) = finished_task.take() { + loaded_programs_cache.cooperative_loading_task_complete(key, loading, loaded); + } + if Arc::strong_count(&extracted) == 1 { + // All the missing entries for this batch have been loaded + break; + } + if let Some(task) = loaded_programs_cache.next_cooperative_loading_task(&extracted) + { + task + } else { + // Waiting for some other TX batch to complete loading the programs needed by this TX batch + // TODO: Use a Condvar here + continue; + } + }; + // Load, verify and compile the program outside of the critical section + let loaded = self.load_program(&key, reload); + finished_task = Some((key, loading, loaded)); } - loaded_programs_for_txs + // When we get here we should be the only remaining owner + std::sync::Mutex::into_inner(Arc::into_inner(extracted).unwrap()) + .unwrap() + .loaded } #[allow(clippy::type_complexity)]