diff --git a/benches/bank.rs b/benches/bank.rs index 3925ffce97474a..491e9d7c51751f 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -42,15 +42,16 @@ fn bench_process_transaction(bencher: &mut Bencher) { let mut id = bank.last_id(); - for _ in 0..(MAX_ENTRY_IDS - 1) { + // TPU rejects ids that are to close to the last + for _ in 0..MAX_ENTRY_IDS / 2 - 1 { + id = hash(&id.as_ref()); bank.register_tick(&id); - id = hash(&id.as_ref()) } bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. bank.clear_signatures(); let results = bank.process_transactions(&transactions); - assert!(results.iter().all(Result::is_ok)); + results.iter().for_each(|r| assert_eq!(Ok(()), *r)); }) } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index a8a03345ab0aae..b931c4ab9bdd79 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -113,7 +113,7 @@ fn main() { } ("verify", _) => { let bank = Bank::new(&genesis_block); - let mut last_id = bank.last_id(); + let mut last_id = bank.active_fork().last_id(); let mut num_entries = 0; for (i, entry) in entries.enumerate() { if i >= head { @@ -129,7 +129,7 @@ fn main() { last_id = entry.id; num_entries += 1; - if let Err(e) = bank.process_entry(&entry) { + if let Err(e) = bank.active_fork().process_entries(&[entry]) { eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e); if !matches.is_present("continue") { exit(1); diff --git a/src/accounts.rs b/src/accounts.rs index b0670289b9349e..0ce9d7d66f44d4 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -80,11 +80,11 @@ impl AccountsDB { hash(&serialize(&ordered_accounts).unwrap()) } - fn load(checkpoints: &[U], pubkey: &Pubkey) -> Option + fn load(deltas: &[U], pubkey: &Pubkey) -> Option where U: Deref, { - for db in checkpoints { + for db in deltas { if let Some(account) = db.accounts.get(pubkey) { return Some(account.clone()); } @@ -96,7 +96,7 @@ impl AccountsDB { pub fn store(&mut self, purge: bool, pubkey: &Pubkey, account: &Account) { if account.tokens == 0 { if purge { - // purge if balance is 0 and no checkpoints + // purge if balance is 0 and no deltas self.accounts.remove(pubkey); } else { // store default account if balance is 0 and there's a checkpoint @@ -127,7 +127,7 @@ impl AccountsDB { } } fn load_tx_accounts( - checkpoints: &[U], + deltas: &[U], tx: &Transaction, error_counters: &mut ErrorCounters, ) -> Result> @@ -148,7 +148,7 @@ impl AccountsDB { // If a fee can pay for execution then the program will be scheduled let mut called_accounts: Vec = vec![]; for key in &tx.account_keys { - called_accounts.push(Self::load(checkpoints, key).unwrap_or_default()); + called_accounts.push(Self::load(deltas, key).unwrap_or_default()); } if called_accounts.is_empty() || called_accounts[0].tokens == 0 { error_counters.account_not_found += 1; @@ -164,7 +164,7 @@ impl AccountsDB { } fn load_executable_accounts( - checkpoints: &[U], + deltas: &[U], mut program_id: Pubkey, error_counters: &mut ErrorCounters, ) -> Result> @@ -185,7 +185,7 @@ impl AccountsDB { } depth += 1; - let program = match Self::load(checkpoints, &program_id) { + let program = match Self::load(deltas, &program_id) { Some(program) => program, None => { error_counters.account_not_found += 1; @@ -207,7 +207,7 @@ impl AccountsDB { /// For each program_id in the transaction, load its loaders. fn load_loaders( - checkpoints: &[U], + deltas: &[U], tx: &Transaction, error_counters: &mut ErrorCounters, ) -> Result>> @@ -222,13 +222,13 @@ impl AccountsDB { return Err(BankError::AccountNotFound); } let program_id = tx.program_ids[ix.program_ids_index as usize]; - Self::load_executable_accounts(checkpoints, program_id, error_counters) + Self::load_executable_accounts(deltas, program_id, error_counters) }) .collect() } fn load_accounts( - checkpoints: &[U], + deltas: &[U], txs: &[Transaction], lock_results: Vec>, error_counters: &mut ErrorCounters, @@ -240,8 +240,8 @@ impl AccountsDB { .zip(lock_results.into_iter()) .map(|etx| match etx { (tx, Ok(())) => { - let accounts = Self::load_tx_accounts(checkpoints, tx, error_counters)?; - let loaders = Self::load_loaders(checkpoints, tx, error_counters)?; + let accounts = Self::load_tx_accounts(deltas, tx, error_counters)?; + let loaders = Self::load_loaders(deltas, tx, error_counters)?; Ok((accounts, loaders)) } (_, Err(e)) => Err(e), @@ -267,11 +267,11 @@ impl AccountsDB { impl Accounts { /// Slow because lock is held for 1 operation insted of many - pub fn load_slow(checkpoints: &[U], pubkey: &Pubkey) -> Option + pub fn load_slow(deltas: &[U], pubkey: &Pubkey) -> Option where U: Deref, { - let dbs: Vec<_> = checkpoints + let dbs: Vec<_> = deltas .iter() .map(|obj| obj.accounts_db.read().unwrap()) .collect(); @@ -349,7 +349,7 @@ impl Accounts { } pub fn load_accounts( - checkpoints: &[U], + deltas: &[U], txs: &[Transaction], results: Vec>, error_counters: &mut ErrorCounters, @@ -357,7 +357,7 @@ impl Accounts { where U: Deref, { - let dbs: Vec<_> = checkpoints + let dbs: Vec<_> = deltas .iter() .map(|obj| obj.accounts_db.read().unwrap()) .collect(); diff --git a/src/bank.rs b/src/bank.rs index 598b4ce988a345..28124e4ab82aa5 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -3,20 +3,15 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. -use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders}; -use crate::counter::Counter; +use crate::bank_delta::BankDelta; +use crate::bank_fork::BankFork; use crate::entry::Entry; +use crate::forks::{self, Forks, ForksError}; use crate::genesis_block::GenesisBlock; -use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; use crate::leader_scheduler::LeaderScheduler; -use crate::poh_recorder::{PohRecorder, PohRecorderError}; -use crate::result::Error; +use crate::poh_recorder::PohRecorder; use crate::rpc_pubsub::RpcSubscriptions; -use crate::status_cache::StatusCache; use bincode::deserialize; -use log::Level; -use rayon::prelude::*; -use solana_runtime::{self, RuntimeError}; use solana_sdk::account::Account; use solana_sdk::bpf_loader; use solana_sdk::budget_program; @@ -29,14 +24,12 @@ use solana_sdk::signature::Signature; use solana_sdk::storage_program; use solana_sdk::system_program; use solana_sdk::system_transaction::SystemTransaction; -use solana_sdk::timing::duration_as_us; use solana_sdk::token_program; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program::{self, VoteState}; use std; use std::result; use std::sync::{Arc, RwLock}; -use std::time::Instant; /// Reasons a transaction might be rejected. #[derive(Debug, PartialEq, Eq, Clone)] @@ -80,6 +73,18 @@ pub enum BankError { // Poh recorder hit the maximum tick height before leader rotation MaxHeightReached, + + /// Fork is not in the Deltas DAG + UnknownFork, + + /// The specified trunk is not in the Deltas DAG + InvalidTrunk, + + /// Specified base delta is still live + DeltaNotFrozen, + + /// Requested live delta is frozen + DeltaIsFrozen, } pub type Result = result::Result; @@ -89,27 +94,16 @@ pub trait BankSubscriptions { fn check_signature(&self, signature: &Signature, status: &Result<()>); } -type BankStatusCache = StatusCache; - /// Manager for the state of all accounts and programs after processing its entries. pub struct Bank { - accounts: Accounts, - - /// A cache of signature statuses - status_cache: RwLock, - - /// FIFO queue of `last_id` items - last_id_queue: RwLock, - + forks: RwLock, subscriptions: RwLock>>, } impl Default for Bank { fn default() -> Self { Self { - accounts: Accounts::default(), - last_id_queue: RwLock::new(LastIdQueue::default()), - status_cache: RwLock::new(BankStatusCache::default()), + forks: RwLock::new(Forks::default()), subscriptions: RwLock::new(None), } } @@ -118,25 +112,71 @@ impl Default for Bank { impl Bank { pub fn new(genesis_block: &GenesisBlock) -> Self { let bank = Self::default(); + let last_id = genesis_block.last_id(); + bank.init_root(&last_id); bank.process_genesis_block(genesis_block); bank.add_builtin_programs(); bank } + fn init_fork(&self, current: u64, last_id: &Hash, base: u64) -> Result<()> { + trace!("new fork current: {} base: {}", current, base); + if self.forks.read().unwrap().is_active_fork(current) { + let parent = self.forks.read().unwrap().deltas.load(current).unwrap().1; + assert_eq!( + parent, base, + "fork initialised a second time with a different base" + ); + trace!("already active: {}", current); + return Ok(()); + } + self.forks + .write() + .unwrap() + .init_fork(current, last_id, base) + .map_err(|e| match e { + ForksError::UnknownFork => BankError::UnknownFork, + ForksError::InvalidTrunk => BankError::InvalidTrunk, + ForksError::DeltaNotFrozen => BankError::DeltaNotFrozen, + ForksError::DeltaIsFrozen => BankError::DeltaIsFrozen, + }) + } + + #[cfg(test)] + pub fn test_active_fork(&self) -> BankFork { + self.active_fork() + } + + fn active_fork(&self) -> BankFork { + self.forks.read().unwrap().active_fork() + } + fn root(&self) -> BankFork { + self.forks.read().unwrap().root() + } + pub fn fork(&self, slot: u64) -> Option { + self.forks.read().unwrap().fork(slot) + } + pub fn set_subscriptions(&self, subscriptions: Arc) { let mut sub = self.subscriptions.write().unwrap(); *sub = Some(subscriptions) } - pub fn copy_for_tpu(&self) -> Self { - let mut status_cache = BankStatusCache::default(); - status_cache.merge_into_root(self.status_cache.read().unwrap().clone()); - Self { - accounts: self.accounts.copy_for_tpu(), - status_cache: RwLock::new(status_cache), - last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()), - subscriptions: RwLock::new(None), - } + pub fn copy_for_tpu(&self) { + let current = self.active_fork().head().fork_id(); + let last_id = self.active_fork().last_id(); + self.active_fork().head().freeze(); + self.merge_into_root(current); + self.init_fork(current + 1, &last_id, current) + .expect("init_fork"); + } + + /// Init the root fork. Only tests should be using this. + pub fn init_root(&self, last_id: &Hash) { + self.forks + .write() + .unwrap() + .init_root(BankDelta::new(0, &last_id)); } fn process_genesis_block(&self, genesis_block: &GenesisBlock) { @@ -148,12 +188,13 @@ impl Bank { let mut mint_account = Account::default(); mint_account.tokens = genesis_block.tokens - genesis_block.bootstrap_leader_tokens; - self.accounts + self.root() + .head() .store_slow(true, &genesis_block.mint_id, &mint_account); let mut bootstrap_leader_account = Account::default(); bootstrap_leader_account.tokens = genesis_block.bootstrap_leader_tokens - 1; - self.accounts.store_slow( + self.root().head().store_slow( true, &genesis_block.bootstrap_leader_id, &bootstrap_leader_account, @@ -177,60 +218,66 @@ impl Bank { .serialize(&mut bootstrap_leader_vote_account.userdata) .unwrap(); - self.accounts.store_slow( + self.root().head().store_slow( true, &genesis_block.bootstrap_leader_vote_account_id, &bootstrap_leader_vote_account, ); - self.last_id_queue - .write() - .unwrap() - .genesis_last_id(&genesis_block.last_id()); + self.root() + .head() + .set_genesis_last_id(&genesis_block.last_id()); } fn add_builtin_programs(&self) { let system_program_account = native_loader::create_program_account("solana_system_program"); - self.accounts + self.root() + .head() .store_slow(true, &system_program::id(), &system_program_account); let vote_program_account = native_loader::create_program_account("solana_vote_program"); - self.accounts + self.root() + .head() .store_slow(true, &vote_program::id(), &vote_program_account); let storage_program_account = native_loader::create_program_account("solana_storage_program"); - self.accounts + self.root() + .head() .store_slow(true, &storage_program::id(), &storage_program_account); let storage_system_account = Account::new(1, 16 * 1024, storage_program::system_id()); - self.accounts + self.root() + .head() .store_slow(true, &storage_program::system_id(), &storage_system_account); let bpf_loader_account = native_loader::create_program_account("solana_bpf_loader"); - self.accounts + self.root() + .head() .store_slow(true, &bpf_loader::id(), &bpf_loader_account); let budget_program_account = native_loader::create_program_account("solana_budget_program"); - self.accounts + self.root() + .head() .store_slow(true, &budget_program::id(), &budget_program_account); let erc20_account = native_loader::create_program_account("solana_erc20"); - self.accounts + self.root() + .head() .store_slow(true, &token_program::id(), &erc20_account); } /// Return the last entry ID registered. pub fn last_id(&self) -> Hash { - self.last_id_queue - .read() - .unwrap() - .last_id - .expect("no last_id has been set") + self.active_fork().last_id() } pub fn get_storage_entry_height(&self) -> u64 { - match self.get_account(&storage_program::system_id()) { + //TODO: root or live? + match self + .active_fork() + .get_account_slow(&storage_program::system_id()) + { Some(storage_system_account) => { let state = deserialize(&storage_system_account.userdata); if let Ok(state) = state { @@ -246,7 +293,10 @@ impl Bank { } pub fn get_storage_last_id(&self) -> Hash { - if let Some(storage_system_account) = self.get_account(&storage_program::system_id()) { + if let Some(storage_system_account) = self + .active_fork() + .get_account_slow(&storage_program::system_id()) + { let state = deserialize(&storage_system_account.userdata); if let Ok(state) = state { let state: storage_program::StorageProgramState = state; @@ -256,53 +306,15 @@ impl Bank { Hash::default() } - /// Forget all signatures. Useful for benchmarking. - pub fn clear_signatures(&self) { - self.status_cache.write().unwrap().clear(); - } - - fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) { - for (i, tx) in txs.iter().enumerate() { - if let Some(ref subs) = *self.subscriptions.read().unwrap() { - subs.check_signature(&tx.signatures[0], &res[i]); - } - } - } - fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { - let mut status_cache = self.status_cache.write().unwrap(); - for (i, tx) in txs.iter().enumerate() { - match &res[i] { - Ok(_) => status_cache.add(&tx.signatures[0]), - Err(BankError::LastIdNotFound) => (), - Err(BankError::DuplicateSignature) => (), - Err(BankError::AccountNotFound) => (), - Err(e) => { - status_cache.add(&tx.signatures[0]); - status_cache.save_failure_status(&tx.signatures[0], e.clone()); - } - } - } - } - - /// Looks through a list of tick heights and stakes, and finds the latest - /// tick that has achieved confirmation - pub fn get_confirmation_timestamp( + #[must_use] + pub fn process_and_record_transactions( &self, - ticks_and_stakes: &mut [(u64, u64)], - supermajority_stake: u64, - ) -> Option { - let last_ids = self.last_id_queue.read().unwrap(); - last_ids.get_confirmation_timestamp(ticks_and_stakes, supermajority_stake) - } - - /// Tell the bank which Entry IDs exist on the ledger. This function - /// assumes subsequent calls correspond to later entries, and will boot - /// the oldest ones once its internal cache is full. Once boot, the - /// bank will reject transactions using that `last_id`. - pub fn register_tick(&self, last_id: &Hash) { - let mut last_id_queue = self.last_id_queue.write().unwrap(); - inc_new_counter_info!("bank-register_tick-registered", 1); - last_id_queue.register_tick(last_id); + txs: &[Transaction], + poh: Option<&PohRecorder>, + ) -> Result>> { + let sub = self.subscriptions.read().unwrap(); + self.active_fork() + .process_and_record_transactions(&sub, txs, poh) } /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. @@ -317,295 +329,11 @@ impl Bank { } } - fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { - self.accounts.lock_accounts(txs) - } - - fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { - self.accounts.unlock_accounts(txs, results) - } - - pub fn process_and_record_transactions( - &self, - txs: &[Transaction], - poh: &PohRecorder, - ) -> Result<()> { - let now = Instant::now(); - // Once accounts are locked, other threads cannot encode transactions that will modify the - // same account state - let lock_results = self.lock_accounts(txs); - let lock_time = now.elapsed(); - - let now = Instant::now(); - // Use a shorter maximum age when adding transactions into the pipeline. This will reduce - // the likelihood of any single thread getting starved and processing old ids. - // TODO: Banking stage threads should be prioritized to complete faster then this queue - // expires. - let (loaded_accounts, results) = - self.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); - let load_execute_time = now.elapsed(); - - let record_time = { - let now = Instant::now(); - self.record_transactions(txs, &results, poh)?; - now.elapsed() - }; - - let commit_time = { - let now = Instant::now(); - self.commit_transactions(txs, &loaded_accounts, &results); - now.elapsed() - }; - - let now = Instant::now(); - // Once the accounts are new transactions can enter the pipeline to process them - self.unlock_accounts(&txs, &results); - let unlock_time = now.elapsed(); - debug!( - "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", - duration_as_us(&lock_time), - duration_as_us(&load_execute_time), - duration_as_us(&record_time), - duration_as_us(&commit_time), - duration_as_us(&unlock_time), - txs.len(), - ); - Ok(()) - } - - fn record_transactions( - &self, - txs: &[Transaction], - results: &[Result<()>], - poh: &PohRecorder, - ) -> Result<()> { - let processed_transactions: Vec<_> = results - .iter() - .zip(txs.iter()) - .filter_map(|(r, x)| match r { - Ok(_) => Some(x.clone()), - Err(BankError::ProgramError(index, err)) => { - info!("program error {:?}, {:?}", index, err); - Some(x.clone()) - } - Err(ref e) => { - debug!("process transaction failed {:?}", e); - None - } - }) - .collect(); - debug!("processed: {} ", processed_transactions.len()); - // unlock all the accounts with errors which are filtered by the above `filter_map` - if !processed_transactions.is_empty() { - let hash = Transaction::hash(&processed_transactions); - // record and unlock will unlock all the successfull transactions - poh.record(hash, processed_transactions).map_err(|e| { - warn!("record failure: {:?}", e); - match e { - Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { - BankError::MaxHeightReached - } - _ => BankError::RecordFailure, - } - })?; - } - Ok(()) - } - - fn load_accounts( - &self, - txs: &[Transaction], - results: Vec>, - error_counters: &mut ErrorCounters, - ) -> Vec> { - Accounts::load_accounts(&[&self.accounts], txs, results, error_counters) - } - fn check_age( - &self, - txs: &[Transaction], - lock_results: Vec>, - max_age: usize, - error_counters: &mut ErrorCounters, - ) -> Vec> { - let last_ids = self.last_id_queue.read().unwrap(); - txs.iter() - .zip(lock_results.into_iter()) - .map(|(tx, lock_res)| { - if lock_res.is_ok() && !last_ids.check_entry_id_age(tx.last_id, max_age) { - error_counters.reserve_last_id += 1; - Err(BankError::LastIdNotFound) - } else { - lock_res - } - }) - .collect() - } - fn check_signatures( - &self, - txs: &[Transaction], - lock_results: Vec>, - error_counters: &mut ErrorCounters, - ) -> Vec> { - let status_cache = self.status_cache.read().unwrap(); - txs.iter() - .zip(lock_results.into_iter()) - .map(|(tx, lock_res)| { - if lock_res.is_ok() && status_cache.has_signature(&tx.signatures[0]) { - error_counters.duplicate_signature += 1; - Err(BankError::DuplicateSignature) - } else { - lock_res - } - }) - .collect() - } - #[allow(clippy::type_complexity)] - fn load_and_execute_transactions( - &self, - txs: &[Transaction], - lock_results: Vec>, - max_age: usize, - ) -> ( - Vec>, - Vec>, - ) { - debug!("processing transactions: {}", txs.len()); - let mut error_counters = ErrorCounters::default(); - let now = Instant::now(); - let age_results = self.check_age(txs, lock_results, max_age, &mut error_counters); - let sig_results = self.check_signatures(txs, age_results, &mut error_counters); - let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); - let tick_height = self.tick_height(); - - let load_elapsed = now.elapsed(); - let now = Instant::now(); - let executed: Vec> = loaded_accounts - .iter_mut() - .zip(txs.iter()) - .map(|(accs, tx)| match accs { - Err(e) => Err(e.clone()), - Ok((ref mut accounts, ref mut loaders)) => { - solana_runtime::execute_transaction(tx, loaders, accounts, tick_height).map_err( - |RuntimeError::ProgramError(index, err)| { - BankError::ProgramError(index, err) - }, - ) - } - }) - .collect(); - - let execution_elapsed = now.elapsed(); - - debug!( - "load: {}us execute: {}us txs_len={}", - duration_as_us(&load_elapsed), - duration_as_us(&execution_elapsed), - txs.len(), - ); - let mut tx_count = 0; - let mut err_count = 0; - for (r, tx) in executed.iter().zip(txs.iter()) { - if r.is_ok() { - tx_count += 1; - } else { - if err_count == 0 { - info!("tx error: {:?} {:?}", r, tx); - } - err_count += 1; - } - } - if err_count > 0 { - info!("{} errors of {} txs", err_count, err_count + tx_count); - inc_new_counter_info!( - "bank-process_transactions-account_not_found", - error_counters.account_not_found - ); - inc_new_counter_info!("bank-process_transactions-error_count", err_count); - } - - self.accounts.increment_transaction_count(tx_count); - - inc_new_counter_info!("bank-process_transactions-txs", tx_count); - if 0 != error_counters.last_id_not_found { - inc_new_counter_info!( - "bank-process_transactions-error-last_id_not_found", - error_counters.last_id_not_found - ); - } - if 0 != error_counters.reserve_last_id { - inc_new_counter_info!( - "bank-process_transactions-error-reserve_last_id", - error_counters.reserve_last_id - ); - } - if 0 != error_counters.duplicate_signature { - inc_new_counter_info!( - "bank-process_transactions-error-duplicate_signature", - error_counters.duplicate_signature - ); - } - if 0 != error_counters.insufficient_funds { - inc_new_counter_info!( - "bank-process_transactions-error-insufficient_funds", - error_counters.insufficient_funds - ); - } - if 0 != error_counters.account_loaded_twice { - inc_new_counter_info!( - "bank-process_transactions-account_loaded_twice", - error_counters.account_loaded_twice - ); - } - (loaded_accounts, executed) - } - - fn commit_transactions( - &self, - txs: &[Transaction], - loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>], - executed: &[Result<()>], - ) { - let now = Instant::now(); - self.accounts - .store_accounts(true, txs, executed, loaded_accounts); - - // Check account subscriptions and send notifications - self.send_account_notifications(txs, executed, loaded_accounts); - - // once committed there is no way to unroll - let write_elapsed = now.elapsed(); - debug!( - "store: {}us txs_len={}", - duration_as_us(&write_elapsed), - txs.len(), - ); - self.update_transaction_statuses(txs, &executed); - self.update_subscriptions(txs, &executed); - } - - /// Process a batch of transactions. - #[must_use] - pub fn load_execute_and_commit_transactions( - &self, - txs: &[Transaction], - lock_results: Vec>, - max_age: usize, - ) -> Vec> { - let (loaded_accounts, executed) = - self.load_and_execute_transactions(txs, lock_results, max_age); - - self.commit_transactions(txs, &loaded_accounts, &executed); - executed - } - #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { - let lock_results = self.lock_accounts(txs); - let results = self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); - self.unlock_accounts(txs, &results); - results + self.process_and_record_transactions(txs, None) + .expect("record skipped") } - pub fn process_entry(&self, entry: &Entry) -> Result<()> { if !entry.is_tick() { for result in self.process_transactions(&entry.transactions) { @@ -639,40 +367,6 @@ impl Bank { Ok(()) } - fn ignore_program_errors(results: Vec>) -> Vec> { - results - .into_iter() - .map(|result| match result { - // Entries that result in a ProgramError are still valid and are written in the - // ledger so map them to an ok return value - Err(BankError::ProgramError(index, err)) => { - info!("program error {:?}, {:?}", index, err); - inc_new_counter_info!("bank-ignore_program_err", 1); - Ok(()) - } - _ => result, - }) - .collect() - } - - fn par_execute_entries(&self, entries: &[(&Entry, Vec>)]) -> Result<()> { - inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); - let results: Vec> = entries - .into_par_iter() - .map(|(e, lock_results)| { - let old_results = self.load_execute_and_commit_transactions( - &e.transactions, - lock_results.to_vec(), - MAX_ENTRY_IDS, - ); - let results = Bank::ignore_program_errors(old_results); - self.unlock_accounts(&e.transactions, &results); - Self::first_err(&results) - }) - .collect(); - Self::first_err(&results) - } - /// process entries in parallel /// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry /// 2. Process the locked group in parallel @@ -683,13 +377,14 @@ impl Bank { entries: &[Entry], leader_scheduler: &Arc>, ) -> Result<()> { + let bank_fork = self.active_fork(); // accumulator for entries that can be processed in parallel let mut mt_group = vec![]; for entry in entries { if entry.is_tick() { // if its a tick, execute the group and register the tick - self.par_execute_entries(&mt_group)?; - self.register_tick(&entry.id); + bank_fork.par_execute_entries(&mt_group)?; + bank_fork.register_tick(&entry.id); leader_scheduler .write() .unwrap() @@ -698,22 +393,24 @@ impl Bank { continue; } // try to lock the accounts - let lock_results = self.lock_accounts(&entry.transactions); + let lock_results = bank_fork.head().lock_accounts(&entry.transactions); // if any of the locks error out // execute the current group if Self::first_err(&lock_results).is_err() { - self.par_execute_entries(&mt_group)?; + bank_fork.par_execute_entries(&mt_group)?; mt_group = vec![]; //reset the lock and push the entry - self.unlock_accounts(&entry.transactions, &lock_results); - let lock_results = self.lock_accounts(&entry.transactions); + bank_fork + .head() + .unlock_accounts(&entry.transactions, &lock_results); + let lock_results = bank_fork.head().lock_accounts(&entry.transactions); mt_group.push((entry, lock_results)); } else { // push the entry to the mt_group mt_group.push((entry, lock_results)); } } - self.par_execute_entries(&mt_group)?; + bank_fork.par_execute_entries(&mt_group)?; Ok(()) } @@ -737,107 +434,82 @@ impl Bank { self.process_transaction(&tx).map(|_| signature) } - pub fn read_balance(account: &Account) -> u64 { - // TODO: Re-instate budget_program special case? - /* - if budget_program::check_id(&account.owner) { - return budget_program::get_balance(account); - } - */ - account.tokens - } - /// Each program would need to be able to introspect its own state - /// this is hard-coded to the Budget language pub fn get_balance(&self, pubkey: &Pubkey) -> u64 { - self.get_account(pubkey) - .map(|x| Self::read_balance(&x)) - .unwrap_or(0) + self.active_fork().get_balance_slow(pubkey) } pub fn get_account(&self, pubkey: &Pubkey) -> Option { - Accounts::load_slow(&[&self.accounts], pubkey) + self.active_fork().get_account_slow(pubkey) } - pub fn transaction_count(&self) -> u64 { - self.accounts.transaction_count() + /// Tell the bank which Entry IDs exist on the ledger. This function + /// assumes subsequent calls correspond to later entries, and will boot + /// the oldest ones once its internal cache is full. Once boot, the + /// bank will reject transactions using that `last_id`. + pub fn register_tick(&self, last_id: &Hash) { + self.active_fork().register_tick(last_id); } pub fn get_signature_status(&self, signature: &Signature) -> Option> { - self.status_cache - .read() - .unwrap() - .get_signature_status(signature) + self.active_fork().get_signature_status(signature) + } + + pub fn transaction_count(&self) -> u64 { + self.active_fork().transaction_count() } pub fn has_signature(&self, signature: &Signature) -> bool { - self.status_cache.read().unwrap().has_signature(signature) + self.active_fork().head().has_signature(signature) } - /// Hash the `accounts` HashMap. This represents a validator's interpretation - /// of the delta of the ledger since the last vote and up to now pub fn hash_internal_state(&self) -> Hash { - self.accounts.hash_internal_state() + self.active_fork().hash_internal_state() } - fn send_account_notifications( - &self, - txs: &[Transaction], - res: &[Result<()>], - loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], - ) { - for (i, raccs) in loaded.iter().enumerate() { - if res[i].is_err() || raccs.is_err() { - continue; - } + pub fn clear_signatures(&self) { + self.active_fork().clear_signatures(); + } - let tx = &txs[i]; - let accs = raccs.as_ref().unwrap(); - for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { - if let Some(ref subs) = *self.subscriptions.read().unwrap() { - subs.check_account(&key, account) - } - } - } + pub fn get_confirmation_timestamp( + &self, + ticks_and_stakes: &mut [(u64, u64)], + supermajority_stake: u64, + ) -> Option { + self.active_fork() + .head() + .get_confirmation_timestamp(ticks_and_stakes, supermajority_stake) } pub fn vote_states(&self, cond: F) -> Vec where F: Fn(&VoteState) -> bool, { - self.accounts - .accounts_db - .read() - .unwrap() - .accounts - .values() - .filter_map(|account| { - if vote_program::check_id(&account.owner) { - if let Ok(vote_state) = VoteState::deserialize(&account.userdata) { - if cond(&vote_state) { - return Some(vote_state); - } - } - } - None - }) - .collect() + self.active_fork().vote_states(cond) } - pub fn tick_height(&self) -> u64 { - self.last_id_queue.read().unwrap().tick_height + self.active_fork().tick_height() } - #[cfg(test)] - pub fn last_ids(&self) -> &RwLock { - &self.last_id_queue + /// An active chain is computed from the leaf_slot + /// The base that is a direct descendant of the root and is in the active chain to the leaf + /// is merged into root, and any forks not attached to the new root are purged. + fn merge_into_root(&self, leaf_slot: u64) { + //there is only one base, and its the current live fork + self.forks + .write() + .unwrap() + .merge_into_root(forks::ROLLBACK_DEPTH, leaf_slot) + .expect("merge into root"); } } #[cfg(test)] mod tests { use super::*; + use crate::bank_fork::BankFork; use crate::entry::{next_entries, next_entry, Entry}; use crate::gen_keys::GenKeys; + use crate::poh_recorder::PohRecorder; use bincode::serialize; use hashbrown::HashSet; use solana_sdk::hash::hash; @@ -1199,79 +871,7 @@ mod tests { .unwrap(); assert_eq!(bank0.hash_internal_state(), bank1.hash_internal_state()); } - #[test] - fn test_interleaving_locks() { - let (genesis_block, mint_keypair) = GenesisBlock::new(3); - let bank = Bank::new(&genesis_block); - let alice = Keypair::new(); - let bob = Keypair::new(); - - let tx1 = SystemTransaction::new_account( - &mint_keypair, - alice.pubkey(), - 1, - genesis_block.last_id(), - 0, - ); - let pay_alice = vec![tx1]; - - let lock_result = bank.lock_accounts(&pay_alice); - let results_alice = - bank.load_execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); - assert_eq!(results_alice[0], Ok(())); - - // try executing an interleaved transfer twice - assert_eq!( - bank.transfer(1, &mint_keypair, bob.pubkey(), genesis_block.last_id()), - Err(BankError::AccountInUse) - ); - // the second time should fail as well - // this verifies that `unlock_accounts` doesn't unlock `AccountInUse` accounts - assert_eq!( - bank.transfer(1, &mint_keypair, bob.pubkey(), genesis_block.last_id()), - Err(BankError::AccountInUse) - ); - - bank.unlock_accounts(&pay_alice, &results_alice); - - assert_matches!( - bank.transfer(2, &mint_keypair, bob.pubkey(), genesis_block.last_id()), - Ok(_) - ); - } - #[test] - fn test_first_err() { - assert_eq!(Bank::first_err(&[Ok(())]), Ok(())); - assert_eq!( - Bank::first_err(&[Ok(()), Err(BankError::DuplicateSignature)]), - Err(BankError::DuplicateSignature) - ); - assert_eq!( - Bank::first_err(&[ - Ok(()), - Err(BankError::DuplicateSignature), - Err(BankError::AccountInUse) - ]), - Err(BankError::DuplicateSignature) - ); - assert_eq!( - Bank::first_err(&[ - Ok(()), - Err(BankError::AccountInUse), - Err(BankError::DuplicateSignature) - ]), - Err(BankError::AccountInUse) - ); - assert_eq!( - Bank::first_err(&[ - Err(BankError::AccountInUse), - Ok(()), - Err(BankError::DuplicateSignature) - ]), - Err(BankError::AccountInUse) - ); - } #[test] fn test_par_process_entries_tick() { let (genesis_block, _mint_keypair) = GenesisBlock::new(1000); @@ -1503,8 +1103,7 @@ mod tests { ]; let mut results = vec![Ok(()), Ok(())]; - bank.record_transactions(&transactions, &results, &poh_recorder) - .unwrap(); + BankFork::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len()); @@ -1513,42 +1112,17 @@ mod tests { 1, ProgramError::ResultWithNegativeTokens, )); - bank.record_transactions(&transactions, &results, &poh_recorder) - .unwrap(); + BankFork::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len()); // Other BankErrors should not be recorded results[0] = Err(BankError::AccountNotFound); - bank.record_transactions(&transactions, &results, &poh_recorder) - .unwrap(); + BankFork::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len() - 1); } - #[test] - fn test_bank_ignore_program_errors() { - let expected_results = vec![Ok(()), Ok(())]; - let results = vec![Ok(()), Ok(())]; - let updated_results = Bank::ignore_program_errors(results); - assert_eq!(updated_results, expected_results); - - let results = vec![ - Err(BankError::ProgramError( - 1, - ProgramError::ResultWithNegativeTokens, - )), - Ok(()), - ]; - let updated_results = Bank::ignore_program_errors(results); - assert_eq!(updated_results, expected_results); - - // Other BankErrors should not be ignored - let results = vec![Err(BankError::AccountNotFound), Ok(())]; - let updated_results = Bank::ignore_program_errors(results); - assert_ne!(updated_results, expected_results); - } - #[test] fn test_bank_storage() { solana_logger::setup(); @@ -1618,7 +1192,7 @@ mod tests { bank.tick_height() + 1, ); - bank.process_and_record_transactions(&transactions, &poh_recorder) + bank.process_and_record_transactions(&transactions, Some(&poh_recorder)) .unwrap(); poh_recorder.tick().unwrap(); @@ -1645,7 +1219,7 @@ mod tests { )]; assert_eq!( - bank.process_and_record_transactions(&transactions, &poh_recorder), + bank.process_and_record_transactions(&transactions, Some(&poh_recorder)), Err(BankError::MaxHeightReached) ); diff --git a/src/bank_delta.rs b/src/bank_delta.rs new file mode 100644 index 00000000000000..e27d0b991800d7 --- /dev/null +++ b/src/bank_delta.rs @@ -0,0 +1,344 @@ +use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders}; +use crate::bank::{BankError, BankSubscriptions, Result}; +use crate::counter::Counter; +use crate::last_id_queue::LastIdQueue; +use crate::rpc_pubsub::RpcSubscriptions; +use crate::status_cache::StatusCache; +use log::Level; +use solana_sdk::account::Account; +use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use solana_sdk::timing::duration_as_us; +use solana_sdk::transaction::Transaction; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Instant; + +type BankStatusCache = StatusCache; + +#[derive(Default)] +pub struct BankDelta { + /// accounts database + pub accounts: Accounts, + /// entries + last_id_queue: RwLock, + /// status cache + status_cache: RwLock, + frozen: AtomicBool, + fork_id: AtomicUsize, +} + +impl std::fmt::Debug for BankDelta { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "BankDelta {{ fork_id: {} }}", self.fork_id()) + } +} + +impl BankDelta { + // last_id id is used by the status_cache to filter duplicate signatures + pub fn new(fork_id: u64, last_id: &Hash) -> Self { + BankDelta { + accounts: Accounts::default(), + last_id_queue: RwLock::new(LastIdQueue::default()), + status_cache: RwLock::new(StatusCache::new(last_id)), + frozen: AtomicBool::new(false), + fork_id: AtomicUsize::new(fork_id as usize), + } + } + /// Create an Bank using a deposit. + pub fn new_from_accounts(fork: u64, accounts: &[(Pubkey, Account)], last_id: &Hash) -> Self { + let bank_state = BankDelta::new(fork, last_id); + for (to, account) in accounts { + bank_state.accounts.store_slow(false, &to, &account); + } + bank_state + } + pub fn store_slow(&self, purge: bool, pubkey: &Pubkey, account: &Account) { + assert!(!self.frozen()); + self.accounts.store_slow(purge, pubkey, account) + } + + /// Forget all signatures. Useful for benchmarking. + pub fn clear_signatures(&self) { + assert!(!self.frozen()); + self.status_cache.write().unwrap().clear(); + } + + /// Return the last entry ID registered. + pub fn last_id(&self) -> Hash { + self.last_id_queue + .read() + .unwrap() + .last_id + .expect("no last_id has been set") + } + + pub fn transaction_count(&self) -> u64 { + self.accounts.transaction_count() + } + pub fn freeze(&self) { + info!( + "delta {} frozen at {}", + self.fork_id.load(Ordering::Relaxed), + self.last_id_queue.read().unwrap().tick_height + ); + + self.frozen.store(true, Ordering::Relaxed); + } + pub fn frozen(&self) -> bool { + self.frozen.load(Ordering::Relaxed) + } + + /// Looks through a list of tick heights and stakes, and finds the latest + /// tick that has achieved finality + pub fn get_confirmation_timestamp( + &self, + ticks_and_stakes: &mut [(u64, u64)], + supermajority_stake: u64, + ) -> Option { + let last_id_queue = self.last_id_queue.read().unwrap(); + last_id_queue.get_confirmation_timestamp(ticks_and_stakes, supermajority_stake) + } + pub fn get_signature_status(&self, signature: &Signature) -> Option> { + self.status_cache + .read() + .unwrap() + .get_signature_status(signature) + } + pub fn has_signature(&self, signature: &Signature) -> bool { + self.status_cache.read().unwrap().has_signature(signature) + } + + pub fn tick_height(&self) -> u64 { + self.last_id_queue.read().unwrap().tick_height + } + + /// Tell the bank which Entry IDs exist on the ledger. This function + /// assumes subsequent calls correspond to later entries, and will boot + /// the oldest ones once its internal cache is full. Once boot, the + /// bank will reject transactions using that `last_id`. + pub fn register_tick(&self, last_id: &Hash) { + assert!(!self.frozen()); + let mut last_id_queue = self.last_id_queue.write().unwrap(); + inc_new_counter_info!("bank-register_tick-registered", 1); + last_id_queue.register_tick(last_id) + } + pub fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { + self.accounts.lock_accounts(txs) + } + pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { + self.accounts.unlock_accounts(txs, results) + } + pub fn check_age( + &self, + txs: &[Transaction], + lock_results: &[Result<()>], + max_age: usize, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let last_ids = self.last_id_queue.read().unwrap(); + txs.iter() + .zip(lock_results.iter()) + .map(|(tx, lock_res)| { + if lock_res.is_ok() && !last_ids.check_entry_id_age(tx.last_id, max_age) { + error_counters.reserve_last_id += 1; + Err(BankError::LastIdNotFound) + } else { + lock_res.clone() + } + }) + .collect() + } + pub fn check_signatures( + &self, + txs: &[Transaction], + lock_results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let status_cache = self.status_cache.read().unwrap(); + txs.iter() + .zip(lock_results.into_iter()) + .map(|(tx, lock_res)| { + if lock_res.is_ok() && status_cache.has_signature(&tx.signatures[0]) { + error_counters.duplicate_signature += 1; + Err(BankError::DuplicateSignature) + } else { + lock_res + } + }) + .collect() + } + + pub fn first_err(results: &[Result<()>]) -> Result<()> { + for r in results { + r.clone()?; + } + Ok(()) + } + + pub fn commit_transactions( + &self, + subscritpions: &Option>, + txs: &[Transaction], + loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>], + executed: &[Result<()>], + ) { + assert!(!self.frozen()); + let now = Instant::now(); + self.accounts + .store_accounts(true, txs, executed, loaded_accounts); + + // Check account subscriptions and send notifications + if let Some(subs) = subscritpions { + Self::send_account_notifications(subs, txs, executed, loaded_accounts); + } + + // once committed there is no way to unroll + let write_elapsed = now.elapsed(); + debug!( + "store: {}us txs_len={}", + duration_as_us(&write_elapsed), + txs.len(), + ); + self.update_transaction_statuses(txs, &executed); + if let Some(subs) = subscritpions { + Self::update_subscriptions(subs, txs, &executed); + } + } + fn send_account_notifications( + subscriptions: &RpcSubscriptions, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], + ) { + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { + continue; + } + + let tx = &txs[i]; + let accs = raccs.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { + subscriptions.check_account(&key, account); + } + } + } + fn update_subscriptions( + subscriptions: &RpcSubscriptions, + txs: &[Transaction], + res: &[Result<()>], + ) { + for (i, tx) in txs.iter().enumerate() { + subscriptions.check_signature(&tx.signatures[0], &res[i]); + } + } + + fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { + assert!(!self.frozen()); + let mut status_cache = self.status_cache.write().unwrap(); + for (i, tx) in txs.iter().enumerate() { + match &res[i] { + Ok(_) => status_cache.add(&tx.signatures[0]), + Err(BankError::LastIdNotFound) => (), + Err(BankError::DuplicateSignature) => (), + Err(BankError::AccountNotFound) => (), + Err(e) => { + status_cache.add(&tx.signatures[0]); + status_cache.save_failure_status(&tx.signatures[0], e.clone()); + } + } + } + } + + pub fn hash_internal_state(&self) -> Hash { + self.accounts.hash_internal_state() + } + pub fn set_genesis_last_id(&self, last_id: &Hash) { + assert!(!self.frozen()); + self.last_id_queue.write().unwrap().genesis_last_id(last_id) + } + + pub fn fork_id(&self) -> u64 { + self.fork_id.load(Ordering::Relaxed) as u64 + } + /// create a new fork for the bank state + pub fn fork(&self, fork_id: u64, last_id: &Hash) -> Self { + Self { + accounts: Accounts::default(), + last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().fork()), + status_cache: RwLock::new(StatusCache::new(last_id)), + frozen: AtomicBool::new(false), + fork_id: AtomicUsize::new(fork_id as usize), + } + } + /// consume the delta into the root state + /// self becomes the new root and its fork_id is updated + pub fn merge_into_root(&self, other: Self) { + assert!(self.frozen()); + assert!(other.frozen()); + let (accounts, last_id_queue, status_cache, fork_id) = { + ( + other.accounts, + other.last_id_queue, + other.status_cache, + other.fork_id, + ) + }; + self.accounts.merge_into_root(accounts); + self.last_id_queue + .write() + .unwrap() + .merge_into_root(last_id_queue.into_inner().unwrap()); + self.status_cache + .write() + .unwrap() + .merge_into_root(status_cache.into_inner().unwrap()); + self.fork_id + .store(fork_id.load(Ordering::Relaxed), Ordering::Relaxed); + } + + #[cfg(test)] + pub fn last_ids(&self) -> &RwLock { + &self.last_id_queue + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::bank::BankError; + + #[test] + fn test_first_err() { + assert_eq!(BankDelta::first_err(&[Ok(())]), Ok(())); + assert_eq!( + BankDelta::first_err(&[Ok(()), Err(BankError::DuplicateSignature)]), + Err(BankError::DuplicateSignature) + ); + assert_eq!( + BankDelta::first_err(&[ + Ok(()), + Err(BankError::DuplicateSignature), + Err(BankError::AccountInUse) + ]), + Err(BankError::DuplicateSignature) + ); + assert_eq!( + BankDelta::first_err(&[ + Ok(()), + Err(BankError::AccountInUse), + Err(BankError::DuplicateSignature) + ]), + Err(BankError::AccountInUse) + ); + assert_eq!( + BankDelta::first_err(&[ + Err(BankError::AccountInUse), + Ok(()), + Err(BankError::DuplicateSignature) + ]), + Err(BankError::AccountInUse) + ); + } +} diff --git a/src/bank_fork.rs b/src/bank_fork.rs new file mode 100644 index 00000000000000..d8b86039ed875d --- /dev/null +++ b/src/bank_fork.rs @@ -0,0 +1,467 @@ +use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders}; +use crate::bank::{BankError, Result}; +use crate::bank_delta::BankDelta; +use crate::counter::Counter; +use crate::entry::Entry; +use crate::last_id_queue::MAX_ENTRY_IDS; +use crate::poh_recorder::{PohRecorder, PohRecorderError}; +use crate::result::Error; +use crate::rpc_pubsub::RpcSubscriptions; +use hashbrown::HashMap; +use log::Level; +use rayon::prelude::*; +use solana_runtime::{self, RuntimeError}; +use solana_sdk::account::Account; +use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use solana_sdk::timing::duration_as_us; +use solana_sdk::transaction::Transaction; +use solana_sdk::vote_program::{self, VoteState}; +use std::sync::Arc; +use std::time::Instant; + +pub struct BankFork { + pub deltas: Vec>, +} + +impl BankFork { + pub fn head(&self) -> &Arc { + self.deltas + .first() + .expect("at least 1 delta needs to be available for the state") + } + fn load_accounts( + &self, + txs: &[Transaction], + results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let accounts: Vec<&Accounts> = self.deltas.iter().map(|c| &c.accounts).collect(); + Accounts::load_accounts(&accounts, txs, results, error_counters) + } + pub fn hash_internal_state(&self) -> Hash { + self.head().hash_internal_state() + } + pub fn transaction_count(&self) -> u64 { + self.head().transaction_count() + } + + pub fn register_tick(&self, last_id: &Hash) { + self.head().register_tick(last_id) + } + + pub fn get_signature_status(&self, signature: &Signature) -> Option> { + self.head().get_signature_status(signature) + } + + pub fn clear_signatures(&self) { + self.head().clear_signatures(); + } + + /// Each program would need to be able to introspect its own state + /// this is hard-coded to the Budget language + pub fn get_balance_slow(&self, pubkey: &Pubkey) -> u64 { + self.load_slow(pubkey) + .map(|x| Self::read_balance(&x)) + .unwrap_or(0) + } + pub fn read_balance(account: &Account) -> u64 { + // TODO: Re-instate budget_program special case? + /* + if budget_program::check_id(&account.owner) { + return budget_program::get_balance(account); + } + */ + account.tokens + } + + pub fn get_account_slow(&self, pubkey: &Pubkey) -> Option { + self.load_slow(pubkey) + } + + pub fn load_slow(&self, pubkey: &Pubkey) -> Option { + let accounts: Vec<&Accounts> = self.deltas.iter().map(|c| &c.accounts).collect(); + Accounts::load_slow(&accounts, pubkey) + } + + pub fn tick_height(&self) -> u64 { + self.head().tick_height() + } + + pub fn last_id(&self) -> Hash { + self.head().last_id() + } + + #[allow(clippy::type_complexity)] + fn load_and_execute_transactions( + &self, + txs: &[Transaction], + lock_results: &[Result<()>], + max_age: usize, + ) -> ( + Vec>, + Vec>, + ) { + let head = &self.deltas[0]; + debug!("processing transactions: {}", txs.len()); + let mut error_counters = ErrorCounters::default(); + let now = Instant::now(); + let age_results = head.check_age(txs, lock_results, max_age, &mut error_counters); + let sig_results = head.check_signatures(txs, age_results, &mut error_counters); + let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); + let tick_height = head.tick_height(); + + let load_elapsed = now.elapsed(); + let now = Instant::now(); + let executed: Vec> = loaded_accounts + .iter_mut() + .zip(txs.iter()) + .map(|(accs, tx)| match accs { + Err(e) => Err(e.clone()), + Ok((ref mut accounts, ref mut loaders)) => { + solana_runtime::execute_transaction(tx, loaders, accounts, tick_height).map_err( + |RuntimeError::ProgramError(index, err)| { + BankError::ProgramError(index, err) + }, + ) + } + }) + .collect(); + + let execution_elapsed = now.elapsed(); + + debug!( + "load: {}us execute: {}us txs_len={}", + duration_as_us(&load_elapsed), + duration_as_us(&execution_elapsed), + txs.len(), + ); + let mut tx_count = 0; + let mut err_count = 0; + for (r, tx) in executed.iter().zip(txs.iter()) { + if r.is_ok() { + tx_count += 1; + } else { + if err_count == 0 { + info!("tx error: {:?} {:?}", r, tx); + } + err_count += 1; + } + } + if err_count > 0 { + info!("{} errors of {} txs", err_count, err_count + tx_count); + inc_new_counter_info!( + "bank-process_transactions-account_not_found", + error_counters.account_not_found + ); + inc_new_counter_info!("bank-process_transactions-error_count", err_count); + } + + head.accounts.increment_transaction_count(tx_count); + + inc_new_counter_info!("bank-process_transactions-txs", tx_count); + if 0 != error_counters.last_id_not_found { + inc_new_counter_info!( + "bank-process_transactions-error-last_id_not_found", + error_counters.last_id_not_found + ); + } + if 0 != error_counters.reserve_last_id { + inc_new_counter_info!( + "bank-process_transactions-error-reserve_last_id", + error_counters.reserve_last_id + ); + } + if 0 != error_counters.duplicate_signature { + inc_new_counter_info!( + "bank-process_transactions-error-duplicate_signature", + error_counters.duplicate_signature + ); + } + if 0 != error_counters.insufficient_funds { + inc_new_counter_info!( + "bank-process_transactions-error-insufficient_funds", + error_counters.insufficient_funds + ); + } + (loaded_accounts, executed) + } + + /// Process a batch of transactions. + #[must_use] + pub fn load_execute_record_notify_commit( + &self, + txs: &[Transaction], + recorder: Option<&PohRecorder>, + subs: &Option>, + lock_results: &[Result<()>], + max_age: usize, + ) -> Result>> { + let head = &self.deltas[0]; + let (loaded_accounts, executed) = + self.load_and_execute_transactions(txs, lock_results, max_age); + if let Some(poh) = recorder { + Self::record_transactions(txs, &executed, poh)?; + } + head.commit_transactions(subs, txs, &loaded_accounts, &executed); + Ok(executed) + } + + #[must_use] + pub fn load_execute_record_commit( + &self, + txs: &[Transaction], + recorder: Option<&PohRecorder>, + lock_results: &[Result<()>], + max_age: usize, + ) -> Result>> { + self.load_execute_record_notify_commit(txs, recorder, &None, lock_results, max_age) + } + pub fn process_and_record_transactions( + &self, + subs: &Option>, + txs: &[Transaction], + poh: Option<&PohRecorder>, + ) -> Result>> { + let head = &self.deltas[0]; + let now = Instant::now(); + // Once accounts are locked, other threads cannot encode transactions that will modify the + // same account state + let lock_results = head.lock_accounts(txs); + let lock_time = now.elapsed(); + + let now = Instant::now(); + // Use a shorter maximum age when adding transactions into the pipeline. This will reduce + // the likelihood of any single thread getting starved and processing old ids. + // TODO: Banking stage threads should be prioritized to complete faster then this queue + // expires. + let (loaded_accounts, results) = + self.load_and_execute_transactions(txs, &lock_results, MAX_ENTRY_IDS as usize / 2); + let load_execute_time = now.elapsed(); + + let record_time = { + let now = Instant::now(); + if let Some(recorder) = poh { + Self::record_transactions(txs, &results, recorder)?; + } + now.elapsed() + }; + + let commit_time = { + let now = Instant::now(); + head.commit_transactions(subs, txs, &loaded_accounts, &results); + now.elapsed() + }; + + let now = Instant::now(); + // Once the accounts are new transactions can enter the pipeline to process them + head.unlock_accounts(&txs, &lock_results); + let unlock_time = now.elapsed(); + debug!( + "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", + duration_as_us(&lock_time), + duration_as_us(&load_execute_time), + duration_as_us(&record_time), + duration_as_us(&commit_time), + duration_as_us(&unlock_time), + txs.len(), + ); + Ok(results) + } + pub fn record_transactions( + txs: &[Transaction], + results: &[Result<()>], + poh: &PohRecorder, + ) -> Result<()> { + let processed_transactions: Vec<_> = results + .iter() + .zip(txs.iter()) + .filter_map(|(r, x)| match r { + Ok(_) => Some(x.clone()), + Err(BankError::ProgramError(index, err)) => { + info!("program error {:?}, {:?}", index, err); + Some(x.clone()) + } + Err(ref e) => { + debug!("process transaction failed {:?}", e); + None + } + }) + .collect(); + debug!("processed: {} ", processed_transactions.len()); + // unlock all the accounts with errors which are filtered by the above `filter_map` + if !processed_transactions.is_empty() { + let hash = Transaction::hash(&processed_transactions); + // record and unlock will unlock all the successfull transactions + poh.record(hash, processed_transactions).map_err(|e| { + trace!("record failure: {:?}", e); + match e { + Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { + trace!("max_height reached"); + BankError::MaxHeightReached + } + _ => BankError::RecordFailure, + } + })?; + } + Ok(()) + } + fn ignore_program_errors(results: Vec>) -> Vec> { + results + .into_iter() + .map(|result| match result { + // Entries that result in a ProgramError are still valid and are written in the + // ledger so map them to an ok return value + Err(BankError::ProgramError(index, err)) => { + info!("program error {:?}, {:?}", index, err); + inc_new_counter_info!("bank-ignore_program_err", 1); + Ok(()) + } + _ => result, + }) + .collect() + } + + pub fn par_execute_entries(&self, entries: &[(&Entry, Vec>)]) -> Result<()> { + let head = &self.deltas[0]; + inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); + let results: Vec> = entries + .into_par_iter() + .map(|(e, lock_results)| { + let old_results = self + .load_execute_record_commit(&e.transactions, None, lock_results, MAX_ENTRY_IDS) + .expect("no record failures"); + let results = Self::ignore_program_errors(old_results); + head.unlock_accounts(&e.transactions, &results); + BankDelta::first_err(&results) + }) + .collect(); + BankDelta::first_err(&results) + } + + pub fn vote_states(&self, cond: F) -> Vec + where + F: Fn(&VoteState) -> bool, + { + let mut all_votes: HashMap = HashMap::new(); + self.deltas.iter().for_each(|c| { + c.accounts + .accounts_db + .read() + .unwrap() + .accounts + .values() + .for_each(|account| { + if vote_program::check_id(&account.owner) { + if let Ok(vote_state) = VoteState::deserialize(&account.userdata) { + if cond(&vote_state) && !all_votes.contains_key(&vote_state.node_id) { + all_votes.insert(vote_state.node_id, vote_state); + } + } + } + }); + }); + all_votes.into_iter().map(|(_, v)| v).collect() + } +} +#[cfg(test)] +mod test { + use super::*; + use solana_sdk::native_loader; + use solana_sdk::native_program::ProgramError; + use solana_sdk::signature::Keypair; + use solana_sdk::signature::KeypairUtil; + use solana_sdk::system_program; + use solana_sdk::system_transaction::SystemTransaction; + + /// Create, sign, and process a Transaction from `keypair` to `to` of + /// `n` tokens where `last_id` is the last Entry ID observed by the client. + pub fn transfer( + bank: &BankFork, + n: u64, + keypair: &Keypair, + to: Pubkey, + last_id: Hash, + ) -> Result { + let tx = SystemTransaction::new_move(keypair, to, n, last_id, 0); + let signature = tx.signatures[0]; + let e = bank + .process_and_record_transactions(&None, &[tx], None) + .expect("no recorder"); + match &e[0] { + Ok(_) => Ok(signature), + Err(e) => Err(e.clone()), + } + } + + fn new_state(mint: &Keypair, tokens: u64, last_id: &Hash) -> BankFork { + let accounts = [(mint.pubkey(), Account::new(tokens, 0, Pubkey::default()))]; + let bank = Arc::new(BankDelta::new_from_accounts(0, &accounts, &last_id)); + BankFork { deltas: vec![bank] } + } + + fn add_system_program(delta: &BankDelta) { + let system_program_account = native_loader::create_program_account("solana_system_program"); + delta.store_slow(false, &system_program::id(), &system_program_account); + } + + #[test] + fn test_interleaving_locks() { + let last_id = Hash::default(); + let mint = Keypair::new(); + let alice = Keypair::new(); + let bob = Keypair::new(); + let bank = new_state(&mint, 3, &last_id); + bank.head().register_tick(&last_id); + add_system_program(bank.head()); + + let tx1 = SystemTransaction::new_move(&mint, alice.pubkey(), 1, last_id, 0); + let pay_alice = vec![tx1]; + + let locked_alice = bank.head().lock_accounts(&pay_alice); + assert!(locked_alice[0].is_ok()); + let results_alice = bank + .load_execute_record_commit(&pay_alice, None, &locked_alice, MAX_ENTRY_IDS) + .unwrap(); + assert_eq!(results_alice[0], Ok(())); + + // try executing an interleaved transfer twice + assert_eq!( + transfer(&bank, 1, &mint, bob.pubkey(), last_id), + Err(BankError::AccountInUse) + ); + // the second time should fail as well + // this verifies that `unlock_accounts` doesn't unlock `AccountInUse` accounts + assert_eq!( + transfer(&bank, 1, &mint, bob.pubkey(), last_id), + Err(BankError::AccountInUse) + ); + + bank.head().unlock_accounts(&pay_alice, &locked_alice); + + assert_matches!(transfer(&bank, 2, &mint, bob.pubkey(), last_id), Ok(_)); + } + #[test] + fn test_bank_ignore_program_errors() { + let expected_results = vec![Ok(()), Ok(())]; + let results = vec![Ok(()), Ok(())]; + let updated_results = BankFork::ignore_program_errors(results); + assert_eq!(updated_results, expected_results); + + let results = vec![ + Err(BankError::ProgramError( + 1, + ProgramError::ResultWithNegativeTokens, + )), + Ok(()), + ]; + let updated_results = BankFork::ignore_program_errors(results); + assert_eq!(updated_results, expected_results); + + // Other BankErrors should not be ignored + let results = vec![Err(BankError::AccountNotFound), Ok(())]; + let updated_results = BankFork::ignore_program_errors(results); + assert_ne!(updated_results, expected_results); + } +} diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 3458c6a2be0b4b..a97fe3f3652cb7 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -135,8 +135,8 @@ impl BankingStage { while chunk_start != transactions.len() { let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]); - let result = - bank.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh); + let result = bank + .process_and_record_transactions(&transactions[chunk_start..chunk_end], Some(poh)); if Err(BankError::MaxHeightReached) == result { break; } diff --git a/src/deltas.rs b/src/deltas.rs new file mode 100644 index 00000000000000..2928c35129e471 --- /dev/null +++ b/src/deltas.rs @@ -0,0 +1,134 @@ +//! Simple data structure to keep track of deltas (partial state updates). It +//! stores a map of forks to a type and parent forks. +//! +//! A root is the fork that is a parent to all the leaf forks. + +use hashbrown::{HashMap, HashSet}; +use std::collections::VecDeque; + +pub struct Deltas { + /// Stores a map from fork to a T and a parent fork + pub deltas: HashMap, +} + +impl Deltas { + pub fn is_empty(&self) -> bool { + self.deltas.is_empty() + } + pub fn load(&self, fork: u64) -> Option<&(T, u64)> { + self.deltas.get(&fork) + } + pub fn store(&mut self, fork: u64, data: T, trunk: u64) { + self.insert(fork, data, trunk); + } + pub fn insert(&mut self, fork: u64, data: T, trunk: u64) { + self.deltas.insert(fork, (data, trunk)); + } + /// Given a base fork, and a maximum number, collect all the + /// forks starting from the base fork backwards + pub fn collect(&self, num: usize, mut base: u64) -> Vec<(u64, &T)> { + let mut rv = vec![]; + loop { + if rv.len() == num { + break; + } + if let Some((val, next)) = self.load(base) { + rv.push((base, val)); + base = *next; + } else { + break; + } + } + rv + } + + ///invert the dag + pub fn invert(&self) -> HashMap> { + let mut idag = HashMap::new(); + for (k, (_, v)) in &self.deltas { + idag.entry(*v).or_insert(HashSet::new()).insert(*k); + } + idag + } + + ///create a new Deltas tree that only derives from the trunk + pub fn prune(&self, trunk: u64, inverse: &HashMap>) -> Self { + let mut new = Self::default(); + // simple BFS + let mut queue = VecDeque::new(); + queue.push_back(trunk); + loop { + if queue.is_empty() { + break; + } + let trunk = queue.pop_front().unwrap(); + let (data, prev) = self.load(trunk).expect("load from inverse").clone(); + new.store(trunk, data.clone(), prev); + if let Some(children) = inverse.get(&trunk) { + let mut next = children.into_iter().cloned().collect(); + queue.append(&mut next); + } + } + new + } +} + +impl Default for Deltas { + fn default() -> Self { + Self { + deltas: HashMap::new(), + } + } +} +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new() { + let cp: Deltas = Deltas::default(); + assert!(cp.is_empty()); + } + + #[test] + fn test_load_store() { + let mut cp: Deltas = Deltas::default(); + assert_eq!(cp.load(1), None); + cp.store(1, true, 0); + assert_eq!(cp.load(1), Some(&(true, 0))); + } + #[test] + fn test_collect() { + let mut cp: Deltas = Deltas::default(); + assert_eq!(cp.load(1), None); + cp.store(1, true, 0); + assert_eq!(cp.collect(0, 1), vec![]); + assert_eq!(cp.collect(1, 1), vec![(1, &true)]); + } + #[test] + fn test_invert() { + let mut cp: Deltas = Deltas::default(); + assert_eq!(cp.load(1), None); + cp.store(1, true, 0); + cp.store(2, true, 0); + let inverse = cp.invert(); + assert_eq!(inverse.len(), 1); + assert_eq!(inverse[&0].len(), 2); + let list: Vec = inverse[&0].iter().cloned().collect(); + assert_eq!(list, vec![1, 2]); + } + #[test] + fn test_prune() { + let mut cp: Deltas = Deltas::default(); + assert_eq!(cp.load(1), None); + cp.store(1, true, 0); + cp.store(2, true, 0); + cp.store(3, true, 1); + let inverse = cp.invert(); + let pruned = cp.prune(1, &inverse); + assert_eq!(pruned.load(0), None); + assert_eq!(pruned.load(1), Some(&(true, 0))); + assert_eq!(pruned.load(2), None); + assert_eq!(pruned.load(3), Some(&(true, 1))); + } +} diff --git a/src/forks.rs b/src/forks.rs new file mode 100644 index 00000000000000..c004837366ebba --- /dev/null +++ b/src/forks.rs @@ -0,0 +1,263 @@ +use crate::bank_delta::BankDelta; +/// This module tracks the forks in the bank +use crate::bank_fork::BankFork; +use std::sync::Arc; +//TODO: own module error +use crate::deltas::Deltas; +use solana_sdk::hash::Hash; +use std; +use std::result; + +pub const ROLLBACK_DEPTH: usize = 32usize; + +/// Reasons a transaction might be rejected. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum ForksError { + /// Fork is not in the Deltas DAG + UnknownFork, + + /// The specified trunk is not in the Deltas DAG + InvalidTrunk, + + /// Specified base delta is still live + DeltaNotFrozen, + + /// Requested live delta is frozen + DeltaIsFrozen, +} + +pub type Result = result::Result; + +#[derive(Default)] +pub struct Forks { + pub deltas: Deltas>, + + /// Last fork to be initialized + /// This should be the last fork to be replayed or the TPU fork + pub active_fork: u64, + + /// Fork that is root + pub root: u64, +} + +impl Forks { + pub fn active_fork(&self) -> BankFork { + info!("getting active: {}", self.active_fork); + self.fork(self.active_fork).expect("live fork") + } + pub fn root(&self) -> BankFork { + info!("getting root: {}", self.root); + self.fork(self.root).expect("root fork") + } + + pub fn fork(&self, fork: u64) -> Option { + info!("getting fork: {}", fork); + let cp: Vec<_> = self + .deltas + .collect(ROLLBACK_DEPTH + 1, fork) + .into_iter() + .map(|x| x.1) + .cloned() + .collect(); + if cp.is_empty() { + None + } else { + Some(BankFork { deltas: cp }) + } + } + /// Collapse the bottom two deltas. + /// The tree is computed from the `leaf` to the `root` + /// The path from `leaf` to the `root` is the active chain. + /// The leaf is the last possible fork, it should have no descendants. + /// The direct child of the root that leads the leaf becomes the new root. + /// The forks that are not a descendant of the new root -> leaf path are pruned. + /// active_fork is the leaf. + /// root is the new root. + /// Return the new root id. + pub fn merge_into_root(&mut self, max_depth: usize, leaf: u64) -> Result> { + // `old` root, should have `root` as its fork_id + // `new` root is a direct descendant of old and has new_root_id as its fork_id + // new is merged into old + // and old is swapped into the delta under new_root_id + let merge_root = { + let active_fork = self.deltas.collect(ROLLBACK_DEPTH + 1, leaf); + let leaf_id = active_fork + .first() + .map(|x| x.0) + .ok_or(ForksError::UnknownFork)?; + assert_eq!(leaf_id, leaf); + let len = active_fork.len(); + if len > max_depth { + let old_root = active_fork[len - 1]; + let new_root = active_fork[len - 2]; + if !new_root.1.frozen() { + trace!("new_root id {}", new_root.1.fork_id()); + return Err(ForksError::DeltaNotFrozen); + } + if !old_root.1.frozen() { + trace!("old id {}", old_root.1.fork_id()); + return Err(ForksError::DeltaNotFrozen); + } + //stupid sanity checks + assert_eq!(new_root.1.fork_id(), new_root.0); + assert_eq!(old_root.1.fork_id(), old_root.0); + Some((old_root.1.clone(), new_root.1.clone(), new_root.0)) + } else { + None + } + }; + if let Some((old_root, new_root, new_root_id)) = merge_root { + let idag = self.deltas.invert(); + let new_deltas = self.deltas.prune(new_root_id, &idag); + let old_root_id = old_root.fork_id(); + self.deltas = new_deltas; + self.root = new_root_id; + self.active_fork = leaf; + // old should have been pruned + assert!(self.deltas.load(old_root_id).is_none()); + // new_root id should be in the new tree + assert!(!self.deltas.load(new_root_id).is_none()); + + // swap in the old instance under the new_root id + // this should be the last external ref to `new_root` + self.deltas + .insert(new_root_id, old_root.clone(), old_root_id); + + // merge all the new changes into the old instance under the new id + // this should consume `new` + // new should have no other references + let new_root: BankDelta = Arc::try_unwrap(new_root).unwrap(); + old_root.merge_into_root(new_root); + assert_eq!(old_root.fork_id(), new_root_id); + Ok(Some(new_root_id)) + } else { + Ok(None) + } + } + + /// Initialize the first root + pub fn init_root(&mut self, delta: BankDelta) { + assert!(self.deltas.is_empty()); + self.active_fork = delta.fork_id(); + self.root = delta.fork_id(); + //TODO: using u64::MAX as the impossible delta + //this should be a None instead + self.deltas + .store(self.active_fork, Arc::new(delta), std::u64::MAX); + } + + pub fn is_active_fork(&self, fork: u64) -> bool { + if let Some(state) = self.deltas.load(fork) { + !state.0.frozen() && self.active_fork == fork + } else { + false + } + } + /// Initialize the `current` fork that is a direct descendant of the `base` fork. + pub fn init_fork(&mut self, current: u64, last_id: &Hash, base: u64) -> Result<()> { + info!("initing fork {} from {}", current, base); + if let Some(state) = self.deltas.load(base) { + if !state.0.frozen() { + return Err(ForksError::DeltaNotFrozen); + } + let new = state.0.fork(current, last_id); + self.deltas.store(current, Arc::new(new), base); + self.active_fork = current; + Ok(()) + } else { + return Err(ForksError::UnknownFork); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_sdk::hash::hash; + + #[test] + fn forks_init_root() { + let mut forks = Forks::default(); + let cp = BankDelta::new(0, &Hash::default()); + forks.init_root(cp); + assert!(forks.is_active_fork(0)); + assert_eq!(forks.root().deltas.len(), 1); + assert_eq!(forks.root().head().fork_id(), 0); + assert_eq!(forks.active_fork().head().fork_id(), 0); + } + + #[test] + fn forks_init_fork() { + let mut forks = Forks::default(); + let last_id = Hash::default(); + let cp = BankDelta::new(0, &last_id); + cp.register_tick(&last_id); + forks.init_root(cp); + let last_id = hash(last_id.as_ref()); + assert_eq!( + forks.init_fork(1, &last_id, 1), + Err(ForksError::UnknownFork) + ); + assert_eq!( + forks.init_fork(1, &last_id, 0), + Err(ForksError::DeltaNotFrozen) + ); + forks.root().head().freeze(); + assert_eq!(forks.init_fork(1, &last_id, 0), Ok(())); + + assert_eq!(forks.root().head().fork_id(), 0); + assert_eq!(forks.active_fork().head().fork_id(), 1); + assert_eq!(forks.active_fork().deltas.len(), 2); + } + + #[test] + fn forks_merge() { + let mut forks = Forks::default(); + let last_id = Hash::default(); + let cp = BankDelta::new(0, &last_id); + cp.register_tick(&last_id); + forks.init_root(cp); + let last_id = hash(last_id.as_ref()); + forks.root().head().freeze(); + assert_eq!(forks.init_fork(1, &last_id, 0), Ok(())); + forks.active_fork().head().register_tick(&last_id); + forks.active_fork().head().freeze(); + assert_eq!(forks.merge_into_root(2, 1), Ok(None)); + assert_eq!(forks.merge_into_root(1, 1), Ok(Some(1))); + + assert_eq!(forks.active_fork().deltas.len(), 1); + assert_eq!(forks.root().head().fork_id(), 1); + assert_eq!(forks.active_fork().head().fork_id(), 1); + } + #[test] + fn forks_merge_prune() { + let mut forks = Forks::default(); + let last_id = Hash::default(); + let cp = BankDelta::new(0, &last_id); + cp.register_tick(&last_id); + forks.init_root(cp); + let last_id = hash(last_id.as_ref()); + forks.root().head().freeze(); + assert_eq!(forks.init_fork(1, &last_id, 0), Ok(())); + assert_eq!(forks.fork(1).unwrap().deltas.len(), 2); + forks.fork(1).unwrap().head().register_tick(&last_id); + + // add a fork 2 to be pruned + // fork 2 connects to 0 + let last_id = hash(last_id.as_ref()); + assert_eq!(forks.init_fork(2, &last_id, 0), Ok(())); + assert_eq!(forks.fork(2).unwrap().deltas.len(), 2); + forks.fork(2).unwrap().head().register_tick(&last_id); + + forks.fork(1).unwrap().head().freeze(); + // fork 1 is the new root, only forks that are descendant from 1 are valid + assert_eq!(forks.merge_into_root(1, 1), Ok(Some(1))); + + // fork 2 is gone since it does not connect to 1 + assert!(forks.fork(2).is_none()); + + assert_eq!(forks.active_fork().deltas.len(), 1); + assert_eq!(forks.root().head().fork_id(), 1); + assert_eq!(forks.active_fork().head().fork_id(), 1); + } +} diff --git a/src/fullnode.rs b/src/fullnode.rs index 4839e02bff85a2..d4a1a420ae3a7e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -364,8 +364,9 @@ impl Fullnode { None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here... }; + self.bank.copy_for_tpu(); self.node_services.tpu.switch_to_leader( - &Arc::new(self.bank.copy_for_tpu()), + &self.bank, PohServiceConfig::default(), self.tpu_sockets .iter() @@ -921,7 +922,8 @@ mod tests { info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); { - let w_last_ids = leader.bank.last_ids().write().unwrap(); + let fork_head = leader.bank.test_active_fork(); + let w_last_ids = fork_head.head().last_ids().write().unwrap(); assert!(w_last_ids.tick_height < ticks_per_slot - 1); } diff --git a/src/lib.rs b/src/lib.rs index e063f5e23ab898..1c0316f05338c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,8 @@ pub mod counter; pub mod accounts; pub mod bank; +pub mod bank_delta; +pub mod bank_fork; pub mod banking_stage; pub mod blob_fetch_stage; pub mod bloom; @@ -27,6 +29,7 @@ pub mod crds_gossip_error; pub mod crds_gossip_pull; pub mod crds_gossip_push; pub mod crds_value; +pub mod deltas; #[macro_use] pub mod contact_info; pub mod blocktree; @@ -40,6 +43,7 @@ pub mod entry_stream_stage; #[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; +pub mod forks; pub mod fullnode; pub mod gen_keys; pub mod genesis_block; diff --git a/src/status_cache.rs b/src/status_cache.rs index e1fa0e9a1c3f78..a4992361adae32 100644 --- a/src/status_cache.rs +++ b/src/status_cache.rs @@ -60,6 +60,7 @@ impl StatusCache { pub fn clear(&mut self) { self.failures.clear(); self.signatures.clear(); + self.merges = VecDeque::new(); } fn get_signature_status_merged(&self, sig: &Signature) -> Option> { for c in &self.merges {