From 40e2a8a4d21fdb0309315128913f2c523ba47de8 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Wed, 19 Jan 2022 13:52:54 -0800 Subject: [PATCH] zcache stats fixes (#113) no abbv in InsertForSpeculativeRead ingestion buffers 256 million bytes -> 256MiB evictions were being over-counted by add_to_index_or_evict() only reads should count as hits/misses (not writes that need to check in cache before ingesting) update PendingChanges stat when adding a pending change via ingestion extract buffer space management to its own function --- .../util/src/zettacache_stats.rs | 2 +- cmd/zfs_object_agent/zcache/src/stats.rs | 7 +- .../zettacache/src/zettacache.rs | 131 +++++++++++------- 3 files changed, 86 insertions(+), 54 deletions(-) diff --git a/cmd/zfs_object_agent/util/src/zettacache_stats.rs b/cmd/zfs_object_agent/util/src/zettacache_stats.rs index 4ba7832c5506..77136430e6cd 100644 --- a/cmd/zfs_object_agent/util/src/zettacache_stats.rs +++ b/cmd/zfs_object_agent/util/src/zettacache_stats.rs @@ -361,7 +361,7 @@ pub enum CacheStatCounter { InsertBytes, InsertForRead, InsertForWrite, - InsertForSpecRead, + InsertForSpeculativeRead, InsertForHealing, InsertDropQueueFull, InsertDropLockBusy, // pending, DOSE-905 diff --git a/cmd/zfs_object_agent/zcache/src/stats.rs b/cmd/zfs_object_agent/zcache/src/stats.rs index 86d9483285de..173e1fe91b6f 100644 --- a/cmd/zfs_object_agent/zcache/src/stats.rs +++ b/cmd/zfs_object_agent/zcache/src/stats.rs @@ -250,7 +250,7 @@ impl StatsDisplay { // INSERTS let inserts = (values.value(InsertForRead) + values.value(InsertForWrite) - + values.value(InsertForSpecRead) + + values.value(InsertForSpeculativeRead) + values.value(InsertForHealing)) as f64 * scale; self.display_count(inserts); @@ -260,7 +260,10 @@ impl StatsDisplay { if self.show_insert_detail { self.display_percent(values.value(InsertForRead) as f64 * scale, inserts); self.display_percent(values.value(InsertForWrite) as f64 * scale, inserts); - self.display_percent(values.value(InsertForSpecRead) as f64 * scale, inserts); + self.display_percent( + values.value(InsertForSpeculativeRead) as f64 * scale, + inserts, + ); self.display_count(values.value(InsertDropQueueFull) as f64 * scale); self.display_count(values.value(InsertDropLockBusy) as f64 * scale); } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 0ffbf16eb523..393cbf02db7d 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -41,6 +41,7 @@ use std::time::Duration; use std::time::Instant; use sysinfo::System; use sysinfo::SystemExt; +use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::time::{sleep_until, timeout_at}; use util::get_tunable; @@ -67,8 +68,8 @@ lazy_static! { static ref HIGH_WATER_CACHE_SIZE_PCT: u64 = get_tunable("high_water_cache_size_pct", 82); static ref GHOST_CACHE_SIZE_PCT: u64 = get_tunable("ghost_cache_size_pct", 100); static ref QUANTILES_IN_SIZE_HISTOGRAM: usize = get_tunable("quantiles_in_size_histogram", 100); - static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256_000_000); - static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256_000_000); + static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256 * 1024 * 1024); + static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256 * 1024 * 1024); static ref INDEX_CACHE_ENTRIES_MEM_PCT: usize = get_tunable("index_cache_entries_mem_pct", 10); static ref DISK_EXPAND_MIN_PCT: f64 = get_tunable("disk_expand_min_pct", 10.0); @@ -262,6 +263,7 @@ impl MergeState { .unwrap(), ); ghost_entry.value.location = None; + self.stats.track_count(Evictions); } if ghost_entry.value.atime >= self.ghost_cutoff { // Preserve ghost entry to our ghost history @@ -270,7 +272,6 @@ impl MergeState { // Entry is now gone from Index, update our traversal postion index.update_last_key(ghost_entry.key); } - self.stats.track_count(Evictions); } } @@ -587,6 +588,7 @@ pub enum LookupResponse { Absent(LockedKey), } +#[derive(Clone, Copy, Debug)] pub enum InsertSource { Heal, Read, @@ -1238,7 +1240,7 @@ impl ZettaCache { // so that the index can't change after we get the value from it. // Lock ordering requires that we lock the index before locking the state. let index = self.index.read().await; - let read_data_fut_opt = { + let fut_or_f = { // We don't want to hold the state lock while reading from disk so we // use lock_non_send() to ensure that we can't hold it across .await. let mut state = self.state.lock_non_send().await; @@ -1299,11 +1301,13 @@ impl ZettaCache { } }; - let f = match read_data_fut_opt { - Either::Left(read_data_fut) => { - // pending state tells us what to do - let result = read_data_fut.await; - if matches!(source, LookupSource::Read | LookupSource::Write) { + let f = match fut_or_f { + Either::Left(fut) => { + // Got the index entry from pending state or index cache and + // already called f(). Now that we've dropped the state lock, + // run the future that it returned. + let result = fut.await; + if matches!(source, LookupSource::Read) { self.stats.track_count(CacheHitWithoutIndexRead); } return result; @@ -1318,7 +1322,7 @@ impl ZettaCache { // TODO -- is CacheMissWithoutIndexRead possible anymore? See DOSE-939 let stat_counter; - let read_data_fut = match index.log.lookup_by_key(key, |entry| entry.key).await { + let fut = match index.log.lookup_by_key(key, |entry| entry.key).await { None => { // key not in index stat_counter = CacheMissAfterIndexRead; @@ -1346,63 +1350,78 @@ impl ZettaCache { f(&mut state, value) } }; - let result = read_data_fut.await; - // Update relevant stat after waiting + let result = fut.await; - if matches!(source, LookupSource::Read | LookupSource::Write) { + // Update relevant stat after waiting + if matches!(source, LookupSource::Read) { self.stats.track_count(stat_counter); } result } - /// Initiates insertion of this block; doesn't wait for the write to disk. - pub async fn insert(&self, locked_key: LockedKey, bytes: AlignedBytes, source: InsertSource) { - // The passed in buffer is only for a single block, which is capped to SPA_MAXBLOCKSIZE, - // and thus we should never have an issue converting the length to a "u32" here. - let len = u32::try_from(bytes.len()).unwrap(); - - self.stats.track_instantaneous( - BlockingBufferBytesAvailable, - (*CACHE_INSERT_BLOCKING_BUFFER_BYTES - - self.blocking_buffer_bytes_available.available_permits()) as u64, - ); - self.stats.track_instantaneous( - NonblockingBufferBytesAvailable, - (*CACHE_INSERT_NONBLOCKING_BUFFER_BYTES - - self.nonblocking_buffer_bytes_available.available_permits()) as u64, - ); - - // This permit will be dropped when the write to disk completes. It + async fn reserve_buffer_space( + &self, + bytes: usize, + source: InsertSource, + ) -> Option { + // The permit should be dropped when the write to disk completes. It // serves to limit the number of insert()'s that we can buffer before // dropping (ignoring) insertion requests. - let insert_permit = match source { + let bytes32 = u32::try_from(bytes).unwrap(); + match source { InsertSource::Heal | InsertSource::SpeculativeRead | InsertSource::Write => match self .nonblocking_buffer_bytes_available .clone() - .try_acquire_many_owned(len) + .try_acquire_many_owned(bytes32) { - Ok(permit) => permit, - Err(tokio::sync::TryAcquireError::NoPermits) => { - self.stats.track_count(InsertDropQueueFull); - return; + Ok(permit) => { + self.stats.track_instantaneous( + NonblockingBufferBytesAvailable, + (*CACHE_INSERT_NONBLOCKING_BUFFER_BYTES + - self.nonblocking_buffer_bytes_available.available_permits()) + as u64, + ); + Some(permit) } + Err(tokio::sync::TryAcquireError::NoPermits) => None, Err(e) => panic!("unexpected error from try_acquire_many_owned: {:?}", e), }, - InsertSource::Read => match self - .blocking_buffer_bytes_available - .clone() - .acquire_many_owned(len) - .await - { - Ok(permit) => permit, - Err(e) => panic!("unexpected error from acquire_many_owned: {:?}", e), - }, + InsertSource::Read => { + let permit = self + .blocking_buffer_bytes_available + .clone() + .acquire_many_owned(bytes32) + .await + .expect("error from acquire_many_owned"); + self.stats.track_instantaneous( + BlockingBufferBytesAvailable, + (*CACHE_INSERT_BLOCKING_BUFFER_BYTES + - self.blocking_buffer_bytes_available.available_permits()) + as u64, + ); + Some(permit) + } + } + } + + /// Initiates insertion of this block; doesn't wait for the write to disk. + pub async fn insert(&self, locked_key: LockedKey, bytes: AlignedBytes, source: InsertSource) { + // This permit will be dropped when the write to disk completes. It + // serves to limit the number of insert()'s that we can buffer before + // dropping (ignoring) insertion requests. + let insert_permit = match self.reserve_buffer_space(bytes.len(), source).await { + Some(permit) => permit, + None => { + self.stats.track_count(InsertDropQueueFull); + return; + } }; + self.stats.track_bytes(InsertBytes, bytes.len() as u64); self.stats.track_count(match source { InsertSource::Heal => InsertForHealing, InsertSource::Read => InsertForRead, - InsertSource::SpeculativeRead => InsertForSpecRead, + InsertSource::SpeculativeRead => InsertForSpeculativeRead, InsertSource::Write => InsertForWrite, }); @@ -1807,8 +1826,7 @@ impl ZettaCacheState { // don't have to traverse the tree again. self.pending_changes .insert(key, PendingChange::UpdateAtime(value, original_atime)); - self.stats - .track_instantaneous(PendingChanges, self.pending_changes.len() as u64); + self.update_pending_stats(); } } if matches!(source, LookupSource::Read) { @@ -1848,6 +1866,17 @@ impl ZettaCacheState { } } + fn update_pending_stats(&self) { + let old_pending = match &self.merge { + Some(ms) => ms.old_pending_changes.len() as u64, + None => 0, + }; + self.stats.track_instantaneous( + PendingChanges, + old_pending + self.pending_changes.len() as u64, + ); + } + fn remove_from_index(&mut self, key: IndexKey, value: IndexValue) { let mut oplog_value = value; match self.pending_changes.get_mut(&key) { @@ -1903,8 +1932,7 @@ impl ZettaCacheState { self.atime_histogram.remove(value); self.operation_log .append(OperationLogEntry::Remove(key, oplog_value)); - self.stats - .track_instantaneous(PendingChanges, self.pending_changes.len() as u64); + self.update_pending_stats(); } /// Insert this block to the cache, if space and performance parameters @@ -1952,6 +1980,7 @@ impl ZettaCacheState { btree_map::Entry::Vacant(ve) => { super_trace!("adding Insert to pending_changes {:?} {:?}", key, value); ve.insert(PendingChange::Insert(value)); + self.update_pending_stats(); } } self.atime_histogram.insert(value);