Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Adds AtomicAge to bucket map holder #33841

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
accounts_index_storage::{AccountsIndexStorage, Startup},
accounts_partition::RentPayingAccountsByPartition,
ancestors::Ancestors,
bucket_map_holder::{Age, BucketMapHolder},
bucket_map_holder::{Age, AtomicAge, BucketMapHolder},
contains::Contains,
in_mem_accounts_index::{InMemAccountsIndex, InsertNewEntryResults, StartupStats},
inline_spl_token::{self, GenericTokenAccount},
Expand Down Expand Up @@ -36,7 +36,7 @@ use {
},
path::PathBuf,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, OnceLock, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
},
Expand Down Expand Up @@ -238,7 +238,7 @@ pub struct AccountMapEntryMeta {
/// true if entry in in-mem idx has changes and needs to be written to disk
pub dirty: AtomicBool,
/// 'age' at which this entry should be purged from the cache (implements lru)
pub age: AtomicU8,
pub age: AtomicAge,
}

impl AccountMapEntryMeta {
Expand All @@ -248,15 +248,15 @@ impl AccountMapEntryMeta {
) -> Self {
AccountMapEntryMeta {
dirty: AtomicBool::new(true),
age: AtomicU8::new(storage.future_age_to_flush(is_cached)),
age: AtomicAge::new(storage.future_age_to_flush(is_cached)),
}
}
pub fn new_clean<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
storage: &Arc<BucketMapHolder<T, U>>,
) -> Self {
AccountMapEntryMeta {
dirty: AtomicBool::new(false),
age: AtomicU8::new(storage.future_age_to_flush(false)),
age: AtomicAge::new(storage.future_age_to_flush(false)),
}
}
}
Expand Down Expand Up @@ -2113,7 +2113,7 @@ pub mod tests {
let (slot, account_info) = entry.slot_list.read().unwrap()[0];
let meta = AccountMapEntryMeta {
dirty: AtomicBool::new(entry.dirty()),
age: AtomicU8::new(entry.age()),
age: AtomicAge::new(entry.age()),
};
PreAllocatedAccountMapEntry::Entry(Arc::new(AccountMapEntryInner::new(
vec![(slot, account_info)],
Expand Down
16 changes: 9 additions & 7 deletions accounts-db/src/bucket_map_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use {
},
};
pub type Age = u8;
pub type AtomicAge = AtomicU8;
const _: () = assert!(std::mem::size_of::<Age>() == std::mem::size_of::<AtomicAge>());

const AGE_MS: u64 = DEFAULT_MS_PER_SLOT; // match one age per slot time

Expand All @@ -37,12 +39,12 @@ pub struct BucketMapHolder<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
/// Instead of accessing the single age and doing math each time, each value is incremented each time the age occurs, which is ~400ms.
/// Callers can ask for the precomputed value they already want.
/// rolling 'current' age
pub age: AtomicU8,
pub age: AtomicAge,
/// rolling age that is 'ages_to_stay_in_cache' + 'age'
pub future_age_to_flush: AtomicU8,
pub future_age_to_flush: AtomicAge,
/// rolling age that is effectively 'age' - 1
/// these items are expected to be flushed from the accounts write cache or otherwise modified before this age occurs
pub future_age_to_flush_cached: AtomicU8,
pub future_age_to_flush_cached: AtomicAge,

pub stats: BucketMapHolderStats,

Expand Down Expand Up @@ -255,11 +257,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> BucketMapHolder<T, U>
ages_to_stay_in_cache,
count_buckets_flushed: AtomicUsize::default(),
// age = 0
age: AtomicU8::default(),
age: AtomicAge::default(),
// future age = age (=0) + ages_to_stay_in_cache
future_age_to_flush: AtomicU8::new(ages_to_stay_in_cache),
future_age_to_flush: AtomicAge::new(ages_to_stay_in_cache),
// effectively age (0) - 1. So, the oldest possible age from 'now'
future_age_to_flush_cached: AtomicU8::new(0_u8.wrapping_sub(1)),
future_age_to_flush_cached: AtomicAge::new(Age::MAX),
stats: BucketMapHolderStats::new(bins),
wait_dirty_or_aged: Arc::default(),
next_bucket_to_flush: AtomicUsize::new(0),
Expand Down Expand Up @@ -442,7 +444,7 @@ pub mod tests {
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(0, test.current_age());
assert_eq!(test.ages_to_stay_in_cache, test.future_age_to_flush(false));
assert_eq!(u8::MAX, test.future_age_to_flush(true));
assert_eq!(Age::MAX, test.future_age_to_flush(true));
(0..bins).for_each(|_| {
test.bucket_flushed_at_current_age(false);
});
Expand Down
8 changes: 4 additions & 4 deletions accounts-db/src/bucket_map_holder_stats.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use {
crate::{
accounts_index::{DiskIndexValue, IndexValue},
bucket_map_holder::BucketMapHolder,
bucket_map_holder::{Age, AtomicAge, BucketMapHolder},
},
solana_sdk::timing::AtomicInterval,
std::{
fmt::Debug,
sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering},
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
},
};

Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct BucketMapHolderStats {
pub flush_entries_evicted_from_mem: AtomicU64,
pub active_threads: AtomicU64,
pub get_range_us: AtomicU64,
last_age: AtomicU8,
last_age: AtomicAge,
last_ages_flushed: AtomicU64,
pub flush_scan_us: AtomicU64,
pub flush_update_us: AtomicU64,
Expand Down Expand Up @@ -120,7 +120,7 @@ impl BucketMapHolderStats {
let mut age_now = age_now as u64;
if last_age > age_now {
// age wrapped
age_now += u8::MAX as u64 + 1;
age_now += Age::MAX as u64 + 1;
}
let age_delta = age_now.saturating_sub(last_age);
if age_delta > 0 {
Expand Down
12 changes: 6 additions & 6 deletions accounts-db/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
AccountMapEntry, AccountMapEntryInner, AccountMapEntryMeta, DiskIndexValue, IndexValue,
PreAllocatedAccountMapEntry, RefCount, SlotList, UpsertReclaim, ZeroLamport,
},
bucket_map_holder::{Age, BucketMapHolder},
bucket_map_holder::{Age, AtomicAge, BucketMapHolder},
bucket_map_holder_stats::BucketMapHolderStats,
waitable_condvar::WaitableCondvar,
},
Expand All @@ -17,7 +17,7 @@ use {
fmt::Debug,
ops::{Bound, RangeBounds, RangeInclusive},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
},
Expand Down Expand Up @@ -89,7 +89,7 @@ impl<T: IndexValue> PossibleEvictions<T> {

// one instance of this represents one bin of the accounts index.
pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
last_age_flushed: AtomicU8,
last_age_flushed: AtomicAge,

// backing store
map_internal: RwLock<InMemMap<T>>,
Expand All @@ -115,7 +115,7 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<

/// how many more ages to skip before this bucket is flushed (as opposed to being skipped).
/// When this reaches 0, this bucket is flushed.
remaining_ages_to_skip_flushing: AtomicU8,
remaining_ages_to_skip_flushing: AtomicAge,

/// an individual bucket will evict its entries and write to disk every 1/NUM_AGES_TO_DISTRIBUTE_FLUSHES ages
/// Higher numbers mean we flush less buckets/s
Expand Down Expand Up @@ -181,12 +181,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
stop_evictions: AtomicU64::default(),
flushing_active: AtomicBool::default(),
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
last_age_flushed: AtomicU8::new(Age::MAX),
last_age_flushed: AtomicAge::new(Age::MAX),
startup_info: StartupInfo::default(),
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
remaining_ages_to_skip_flushing: AtomicU8::new(
remaining_ages_to_skip_flushing: AtomicAge::new(
thread_rng().gen_range(0..num_ages_to_distribute_flushes),
),
num_ages_to_distribute_flushes,
Expand Down