Skip to content

Commit

Permalink
zcache stats fixes (openzfs#113)
Browse files Browse the repository at this point in the history
no abbv in InsertForSpeculativeRead
ingestion buffers 256 million bytes -> 256MiB
evictions were being over-counted by add_to_index_or_evict()
only reads should count as hits/misses (not writes that need to check in cache before ingesting)
update PendingChanges stat when adding a pending change via ingestion
extract buffer space management to its own function
  • Loading branch information
ahrens authored Jan 19, 2022
1 parent f998f1b commit 40e2a8a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/util/src/zettacache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ pub enum CacheStatCounter {
InsertBytes,
InsertForRead,
InsertForWrite,
InsertForSpecRead,
InsertForSpeculativeRead,
InsertForHealing,
InsertDropQueueFull,
InsertDropLockBusy, // pending, DOSE-905
Expand Down
7 changes: 5 additions & 2 deletions cmd/zfs_object_agent/zcache/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl StatsDisplay {
// INSERTS
let inserts = (values.value(InsertForRead)
+ values.value(InsertForWrite)
+ values.value(InsertForSpecRead)
+ values.value(InsertForSpeculativeRead)
+ values.value(InsertForHealing)) as f64
* scale;
self.display_count(inserts);
Expand All @@ -260,7 +260,10 @@ impl StatsDisplay {
if self.show_insert_detail {
self.display_percent(values.value(InsertForRead) as f64 * scale, inserts);
self.display_percent(values.value(InsertForWrite) as f64 * scale, inserts);
self.display_percent(values.value(InsertForSpecRead) as f64 * scale, inserts);
self.display_percent(
values.value(InsertForSpeculativeRead) as f64 * scale,
inserts,
);
self.display_count(values.value(InsertDropQueueFull) as f64 * scale);
self.display_count(values.value(InsertDropLockBusy) as f64 * scale);
}
Expand Down
131 changes: 80 additions & 51 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::time::Duration;
use std::time::Instant;
use sysinfo::System;
use sysinfo::SystemExt;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
use tokio::time::{sleep_until, timeout_at};
use util::get_tunable;
Expand All @@ -67,8 +68,8 @@ lazy_static! {
static ref HIGH_WATER_CACHE_SIZE_PCT: u64 = get_tunable("high_water_cache_size_pct", 82);
static ref GHOST_CACHE_SIZE_PCT: u64 = get_tunable("ghost_cache_size_pct", 100);
static ref QUANTILES_IN_SIZE_HISTOGRAM: usize = get_tunable("quantiles_in_size_histogram", 100);
static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256_000_000);
static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256_000_000);
static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256 * 1024 * 1024);
static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256 * 1024 * 1024);
static ref INDEX_CACHE_ENTRIES_MEM_PCT: usize = get_tunable("index_cache_entries_mem_pct", 10);
static ref DISK_EXPAND_MIN_PCT: f64 = get_tunable("disk_expand_min_pct", 10.0);

Expand Down Expand Up @@ -262,6 +263,7 @@ impl MergeState {
.unwrap(),
);
ghost_entry.value.location = None;
self.stats.track_count(Evictions);
}
if ghost_entry.value.atime >= self.ghost_cutoff {
// Preserve ghost entry to our ghost history
Expand All @@ -270,7 +272,6 @@ impl MergeState {
// Entry is now gone from Index, update our traversal postion
index.update_last_key(ghost_entry.key);
}
self.stats.track_count(Evictions);
}
}

Expand Down Expand Up @@ -587,6 +588,7 @@ pub enum LookupResponse {
Absent(LockedKey),
}

#[derive(Clone, Copy, Debug)]
pub enum InsertSource {
Heal,
Read,
Expand Down Expand Up @@ -1238,7 +1240,7 @@ impl ZettaCache {
// so that the index can't change after we get the value from it.
// Lock ordering requires that we lock the index before locking the state.
let index = self.index.read().await;
let read_data_fut_opt = {
let fut_or_f = {
// We don't want to hold the state lock while reading from disk so we
// use lock_non_send() to ensure that we can't hold it across .await.
let mut state = self.state.lock_non_send().await;
Expand Down Expand Up @@ -1299,11 +1301,13 @@ impl ZettaCache {
}
};

let f = match read_data_fut_opt {
Either::Left(read_data_fut) => {
// pending state tells us what to do
let result = read_data_fut.await;
if matches!(source, LookupSource::Read | LookupSource::Write) {
let f = match fut_or_f {
Either::Left(fut) => {
// Got the index entry from pending state or index cache and
// already called f(). Now that we've dropped the state lock,
// run the future that it returned.
let result = fut.await;
if matches!(source, LookupSource::Read) {
self.stats.track_count(CacheHitWithoutIndexRead);
}
return result;
Expand All @@ -1318,7 +1322,7 @@ impl ZettaCache {

// TODO -- is CacheMissWithoutIndexRead possible anymore? See DOSE-939
let stat_counter;
let read_data_fut = match index.log.lookup_by_key(key, |entry| entry.key).await {
let fut = match index.log.lookup_by_key(key, |entry| entry.key).await {
None => {
// key not in index
stat_counter = CacheMissAfterIndexRead;
Expand Down Expand Up @@ -1346,63 +1350,78 @@ impl ZettaCache {
f(&mut state, value)
}
};
let result = read_data_fut.await;
// Update relevant stat after waiting
let result = fut.await;

if matches!(source, LookupSource::Read | LookupSource::Write) {
// Update relevant stat after waiting
if matches!(source, LookupSource::Read) {
self.stats.track_count(stat_counter);
}
result
}

/// Initiates insertion of this block; doesn't wait for the write to disk.
pub async fn insert(&self, locked_key: LockedKey, bytes: AlignedBytes, source: InsertSource) {
// The passed in buffer is only for a single block, which is capped to SPA_MAXBLOCKSIZE,
// and thus we should never have an issue converting the length to a "u32" here.
let len = u32::try_from(bytes.len()).unwrap();

self.stats.track_instantaneous(
BlockingBufferBytesAvailable,
(*CACHE_INSERT_BLOCKING_BUFFER_BYTES
- self.blocking_buffer_bytes_available.available_permits()) as u64,
);
self.stats.track_instantaneous(
NonblockingBufferBytesAvailable,
(*CACHE_INSERT_NONBLOCKING_BUFFER_BYTES
- self.nonblocking_buffer_bytes_available.available_permits()) as u64,
);

// This permit will be dropped when the write to disk completes. It
async fn reserve_buffer_space(
&self,
bytes: usize,
source: InsertSource,
) -> Option<OwnedSemaphorePermit> {
// The permit should be dropped when the write to disk completes. It
// serves to limit the number of insert()'s that we can buffer before
// dropping (ignoring) insertion requests.
let insert_permit = match source {
let bytes32 = u32::try_from(bytes).unwrap();
match source {
InsertSource::Heal | InsertSource::SpeculativeRead | InsertSource::Write => match self
.nonblocking_buffer_bytes_available
.clone()
.try_acquire_many_owned(len)
.try_acquire_many_owned(bytes32)
{
Ok(permit) => permit,
Err(tokio::sync::TryAcquireError::NoPermits) => {
self.stats.track_count(InsertDropQueueFull);
return;
Ok(permit) => {
self.stats.track_instantaneous(
NonblockingBufferBytesAvailable,
(*CACHE_INSERT_NONBLOCKING_BUFFER_BYTES
- self.nonblocking_buffer_bytes_available.available_permits())
as u64,
);
Some(permit)
}
Err(tokio::sync::TryAcquireError::NoPermits) => None,
Err(e) => panic!("unexpected error from try_acquire_many_owned: {:?}", e),
},
InsertSource::Read => match self
.blocking_buffer_bytes_available
.clone()
.acquire_many_owned(len)
.await
{
Ok(permit) => permit,
Err(e) => panic!("unexpected error from acquire_many_owned: {:?}", e),
},
InsertSource::Read => {
let permit = self
.blocking_buffer_bytes_available
.clone()
.acquire_many_owned(bytes32)
.await
.expect("error from acquire_many_owned");
self.stats.track_instantaneous(
BlockingBufferBytesAvailable,
(*CACHE_INSERT_BLOCKING_BUFFER_BYTES
- self.blocking_buffer_bytes_available.available_permits())
as u64,
);
Some(permit)
}
}
}

/// Initiates insertion of this block; doesn't wait for the write to disk.
pub async fn insert(&self, locked_key: LockedKey, bytes: AlignedBytes, source: InsertSource) {
// This permit will be dropped when the write to disk completes. It
// serves to limit the number of insert()'s that we can buffer before
// dropping (ignoring) insertion requests.
let insert_permit = match self.reserve_buffer_space(bytes.len(), source).await {
Some(permit) => permit,
None => {
self.stats.track_count(InsertDropQueueFull);
return;
}
};

self.stats.track_bytes(InsertBytes, bytes.len() as u64);
self.stats.track_count(match source {
InsertSource::Heal => InsertForHealing,
InsertSource::Read => InsertForRead,
InsertSource::SpeculativeRead => InsertForSpecRead,
InsertSource::SpeculativeRead => InsertForSpeculativeRead,
InsertSource::Write => InsertForWrite,
});

Expand Down Expand Up @@ -1807,8 +1826,7 @@ impl ZettaCacheState {
// don't have to traverse the tree again.
self.pending_changes
.insert(key, PendingChange::UpdateAtime(value, original_atime));
self.stats
.track_instantaneous(PendingChanges, self.pending_changes.len() as u64);
self.update_pending_stats();
}
}
if matches!(source, LookupSource::Read) {
Expand Down Expand Up @@ -1848,6 +1866,17 @@ impl ZettaCacheState {
}
}

fn update_pending_stats(&self) {
let old_pending = match &self.merge {
Some(ms) => ms.old_pending_changes.len() as u64,
None => 0,
};
self.stats.track_instantaneous(
PendingChanges,
old_pending + self.pending_changes.len() as u64,
);
}

fn remove_from_index(&mut self, key: IndexKey, value: IndexValue) {
let mut oplog_value = value;
match self.pending_changes.get_mut(&key) {
Expand Down Expand Up @@ -1903,8 +1932,7 @@ impl ZettaCacheState {
self.atime_histogram.remove(value);
self.operation_log
.append(OperationLogEntry::Remove(key, oplog_value));
self.stats
.track_instantaneous(PendingChanges, self.pending_changes.len() as u64);
self.update_pending_stats();
}

/// Insert this block to the cache, if space and performance parameters
Expand Down Expand Up @@ -1952,6 +1980,7 @@ impl ZettaCacheState {
btree_map::Entry::Vacant(ve) => {
super_trace!("adding Insert to pending_changes {:?} {:?}", key, value);
ve.insert(PendingChange::Insert(value));
self.update_pending_stats();
}
}
self.atime_histogram.insert(value);
Expand Down

0 comments on commit 40e2a8a

Please sign in to comment.