Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Adds LoadedPrograms::next_cooperative_loading_task() and LoadedProgra…
Browse files Browse the repository at this point in the history
…ms::cooperative_loading_task_complete().
  • Loading branch information
Lichtso committed Oct 27, 2023
1 parent 787ef14 commit c036405
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 49 deletions.
93 changes: 84 additions & 9 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
timings::ExecuteDetailsTimings,
},
itertools::Itertools,
log::{debug, error, log_enabled, trace},
log::{debug, log_enabled, trace},
percentage::PercentageInteger,
solana_measure::measure::Measure,
solana_rbpf::{
Expand Down Expand Up @@ -658,14 +658,14 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {

/// Before rerooting the blockstore this removes all superfluous entries
pub fn prune(&mut self, new_root_slot: Slot, new_root_epoch: Epoch) {
let Some(fork_graph) = self.fork_graph.clone() else {
error!("Program cache doesn't have fork graph.");
return;
};
let Ok(fork_graph) = fork_graph.read() else {
error!("Failed to lock fork graph for reading.");
return;
};
let fork_graph = self
.fork_graph
.as_ref()
.cloned()
.expect("Program cache doesn't have fork graph.");
let fork_graph = fork_graph
.read()
.expect("Failed to lock fork graph for reading.");
for second_level in self.entries.values_mut() {
// Remove entries un/re/deployed on orphan forks
let mut first_ancestor_found = false;
Expand Down Expand Up @@ -873,6 +873,81 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
extracted
}

/// In cooperative loading a TX batch calls this to receive the next task
pub fn next_cooperative_loading_task(
&mut self,
extracted: &Arc<Mutex<ExtractedPrograms>>,
) -> Option<(Pubkey, Arc<LoadedProgram>, bool)> {
// The Mutexes are strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let extracted = extracted.lock().unwrap();
let (key, (entry, reload)) =
extracted.loading.iter().find(|(_key, (entry, _reload))| {
let LoadedProgramType::Loading(mutex) = &entry.program else {
debug_assert!(false);
return false;
};
let processing = mutex.lock().unwrap().0;
!processing
})?;
let (key, entry, reload) = (*key, entry.clone(), *reload);
drop(extracted);
{
let LoadedProgramType::Loading(mutex) = &entry.program else {
debug_assert!(false);
return None;
};
let processing = &mut mutex.lock().unwrap().0;
*processing = true;
}
Some((key, entry, reload))
}

/// Upon completing a task in cooperative loading a TX batch calls this to submit the result
pub fn cooperative_loading_task_complete(
&mut self,
key: Pubkey,
loading: Arc<LoadedProgram>,
loaded: Arc<LoadedProgram>,
) {
let LoadedProgramType::Loading(mutex) = &loading.program else {
debug_assert!(false);
return;
};
let mut mutex = mutex.lock().unwrap();
let processing = &mut mutex.0;
*processing = false;
let waiting_list_is_empty = {
let fork_graph = self
.fork_graph
.as_ref()
.expect("Program cache doesn't have fork graph.");
let fork_graph = fork_graph
.read()
.expect("Failed to lock fork graph for reading.");
let waiting_list = &mut mutex.1;
waiting_list.retain(|waiting| {
// The Mutex around `waiting` is strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let mut waiting = waiting.lock().unwrap();
let relation = fork_graph.relationship(loaded.deployment_slot, waiting.loaded.slot);
if loaded.deployment_slot <= self.latest_root_slot
|| matches!(relation, BlockRelation::Equal | BlockRelation::Descendant)
{
waiting.loading.remove(&key);
waiting.loaded.assign_program(key, loaded.clone());
return false;
}
true
});
waiting_list.is_empty()
};
if waiting_list_is_empty {
self.remove_program(key, &loading);
}
self.assign_program(key, loaded);
}

pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
tx_batch_cache.entries.iter().for_each(|(key, entry)| {
self.assign_program(*key, entry.clone());
Expand Down
72 changes: 32 additions & 40 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ use {
sync::{
atomic::{
AtomicBool, AtomicI64, AtomicU64, AtomicUsize,
Ordering::{self, AcqRel, Acquire, Relaxed},
Ordering::{AcqRel, Acquire, Relaxed},
},
Arc, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
Expand Down Expand Up @@ -277,7 +277,6 @@ pub struct BankRc {

#[cfg(RUSTC_WITH_SPECIALIZATION)]
use solana_frozen_abi::abi_example::AbiExample;
use solana_program_runtime::loaded_programs::ExtractedPrograms;

#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl AbiExample for BankRc {
Expand Down Expand Up @@ -5030,50 +5029,43 @@ impl Bank {
.collect()
};

let ExtractedPrograms {
loaded: mut loaded_programs_for_txs,
missing,
unloaded,
} = {
let extracted = {
// Lock the global cache to figure out which programs need to be loaded
let loaded_programs_cache = self.loaded_programs_cache.read().unwrap();
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
loaded_programs_cache.extract(self, programs_and_slots.into_iter())
};

// Load missing programs while global cache is unlocked
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing
.iter()
.map(|(key, count)| {
let program = self.load_program(key, false);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
(*key, program)
})
.collect();

// Reload unloaded programs while global cache is unlocked
let unloaded_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = unloaded
.iter()
.map(|(key, count)| {
let program = self.load_program(key, true);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
(*key, program)
})
.collect();

// Lock the global cache again to replenish the missing programs
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
for (key, program) in missing_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
}
for (key, program) in unloaded_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
// Cooperative loading phase
let mut finished_task = None;
loop {
// Critical section for global coordination
let (key, loading, reload) = {
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
if let Some((key, loading, loaded)) = finished_task.take() {
loaded_programs_cache.cooperative_loading_task_complete(key, loading, loaded);
}
if Arc::strong_count(&extracted) == 1 {
// All the missing entries for this batch have been loaded
break;
}
if let Some(task) = loaded_programs_cache.next_cooperative_loading_task(&extracted)
{
task
} else {
// Waiting for some other TX batch to complete loading the programs needed by this TX batch
// TODO: Use a Condvar here
continue;
}
};
// Load, verify and compile the program outside of the critical section
let loaded = self.load_program(&key, reload);
finished_task = Some((key, loading, loaded));
}

loaded_programs_for_txs
// When we get here we should be the only remaining owner
std::sync::Mutex::into_inner(Arc::into_inner(extracted).unwrap())
.unwrap()
.loaded
}

#[allow(clippy::type_complexity)]
Expand Down

0 comments on commit c036405

Please sign in to comment.