diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs index 6c2451a01469..224e10036a46 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs @@ -11,6 +11,7 @@ use futures::StreamExt; use futures_core::Stream; use lazy_static::lazy_static; use log::*; +use lru::LruCache; use more_asserts::*; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -22,16 +23,24 @@ use std::ops::Add; use std::ops::Bound::*; use std::ops::Sub; use std::sync::Arc; +use std::sync::Mutex; use std::time::Instant; use util::get_tunable; use util::super_trace; use util::zettacache_stats::DiskIoType; use util::AlignedVec; +use util::From64; +use util::LockSet; lazy_static! { static ref ENTRIES_PER_CHUNK: usize = get_tunable("entries_per_chunk", 200); // Note: kernel sends writes to disk in at most 256K chunks (at least with nvme driver) static ref WRITE_AGGREGATION_SIZE: usize = get_tunable("write_aggregation_size", 256 * 1024); + // We primarily use the chunk cache to ensure that when looking up all the + // entries in an object, we need at most one read from the index. So we + // only need as many chunks in the cache as the number of objects that we + // might be processing concurrently. + static ref CHUNK_CACHE_ENTRIES: usize = get_tunable("chunk_cache_entries", 128); } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -137,8 +146,6 @@ pub struct BlockBasedLogWithSummaryPhys { this: BlockBasedLogPhys, #[serde(bound(deserialize = "T: DeserializeOwned"))] chunk_summary: BlockBasedLogPhys>, - #[serde(bound(deserialize = "T: DeserializeOwned"))] - last_entry: Option, } impl Default for BlockBasedLogWithSummaryPhys { @@ -146,7 +153,6 @@ impl Default for BlockBasedLogWithSummaryPhys { Self { this: Default::default(), chunk_summary: Default::default(), - last_entry: Default::default(), } } } @@ -202,7 +208,8 @@ pub struct BlockBasedLogWithSummary { this: BlockBasedLog, chunk_summary: BlockBasedLog>, chunks: Vec>, - last_entry: Option, + chunk_cache: Mutex>>, + chunk_reads: LockSet, } #[derive(Serialize, Deserialize, Debug)] @@ -423,7 +430,8 @@ impl BlockBasedLogWithSummary { this: BlockBasedLog::open(block_access.clone(), extent_allocator.clone(), phys.this), chunk_summary, chunks, - last_entry: phys.last_entry, + chunk_cache: Mutex::new(LruCache::new(*CHUNK_CACHE_ENTRIES)), + chunk_reads: Default::default(), } } @@ -451,7 +459,6 @@ impl BlockBasedLogWithSummary { BlockBasedLogWithSummaryPhys { this: new_this, chunk_summary: new_chunk_summary, - last_entry: self.last_entry, } } @@ -463,7 +470,6 @@ impl BlockBasedLogWithSummary { BlockBasedLogWithSummaryPhys { this: self.this.phys.clone(), chunk_summary: self.chunk_summary.phys.clone(), - last_entry: self.last_entry, } } @@ -482,12 +488,10 @@ impl BlockBasedLogWithSummary { } pub fn append(&mut self, entry: T) { - self.last_entry = Some(entry); self.this.append(entry); } pub fn clear(&mut self) { - self.last_entry = None; self.this.clear(); self.chunk_summary.clear(); } @@ -498,7 +502,8 @@ impl BlockBasedLogWithSummary { } /// Returns the exact location/size of this chunk (not the whole contiguous extent) - fn chunk_extent(&self, chunk_id: usize) -> Extent { + fn chunk_extent(&self, chunk_id: ChunkId) -> Extent { + let chunk_id = usize::from64(chunk_id.0); let chunk_summary = self.chunks[chunk_id]; let chunk_size = if chunk_id == self.chunks.len() - 1 { self.this.phys.next_chunk_offset - chunk_summary.offset @@ -523,40 +528,46 @@ impl BlockBasedLogWithSummary { { assert_eq!(ChunkId(self.chunks.len() as u64), self.this.phys.next_chunk); - // Check if the key is after the last entry. - // XXX Note that this won't be very useful when there are multiple - // pools. When doing writes to all but the "last" pool, this check will - // fail, and we'll have to read from the index for every write. We - // could address this by having one index (BlockBasedLogWithSummary) per - // pool, or by replacing the last_entry optimization with a small cache - // of BlockBasedLogChunk's. - match self.last_entry { - Some(last_entry) => { - if key > &f(&last_entry) { - assert_gt!(key, &f(&self.chunks.last().unwrap().first_entry)); - return None; - } - } - None => { - assert!(self.chunks.is_empty()); - return None; - } - } - // Find the chunk_id that this key belongs in. let chunk_id = match self .chunks .binary_search_by_key(key, |chunk_summary| f(&chunk_summary.first_entry)) { - Ok(index) => index, + Ok(index) => ChunkId(index as u64), Err(index) if index == 0 => return None, // key is before the first chunk, therefore not present - Err(index) => index - 1, + Err(index) => ChunkId(index as u64 - 1), }; + if let Some(chunk) = self.chunk_cache.lock().unwrap().get(&chunk_id) { + super_trace!("found {:?} in cache", chunk_id); + // found in cache + // Search within this chunk. + return chunk + .entries + .binary_search_by_key(key, f) + .ok() + .map(|index| chunk.entries[index]); + } + + // Lock the chunk so that only one thread reads it + let _guard = self.chunk_reads.lock(chunk_id).await; + + // Check again in case another thread already read it + if let Some(chunk) = self.chunk_cache.lock().unwrap().get(&chunk_id) { + super_trace!("found {:?} in cache after waiting for lock", chunk_id); + // found in cache + // Search within this chunk. + return chunk + .entries + .binary_search_by_key(key, f) + .ok() + .map(|index| chunk.entries[index]); + } + // Read the chunk from disk. let chunk_extent = self.chunk_extent(chunk_id); super_trace!( - "reading log chunk {} at {:?} to lookup {:?}", + "reading {:?} at {:?} to lookup {:?}", chunk_id, chunk_extent, key @@ -568,16 +579,20 @@ impl BlockBasedLogWithSummary { .await; let (chunk, _consumed): (BlockBasedLogChunk, usize) = self.this.block_access.chunk_from_raw(&chunk_bytes).unwrap(); - assert_eq!(chunk.id, ChunkId(chunk_id as u64)); - - // XXX Can we assert that we are looking in the right chunk? I think - // we'd need the chunk to have the next chunk's first key as well. + assert_eq!(chunk.id, chunk_id); // Search within this chunk. - match chunk.entries.binary_search_by_key(key, f) { - Ok(index) => Some(chunk.entries[index]), - Err(_) => None, - } + let result = chunk + .entries + .binary_search_by_key(key, f) + .ok() + .map(|index| chunk.entries[index]); + + // add to cache + super_trace!("inserting {:?} to cache", chunk_id); + self.chunk_cache.lock().unwrap().put(chunk_id, chunk); + + result } /// Entries must have been added in sorted order, according to the provided @@ -630,7 +645,9 @@ impl Sub for LogOffset { } } -#[derive(Serialize, Deserialize, Default, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] +#[derive( + Serialize, Deserialize, Default, Debug, Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, +)] pub struct ChunkId(u64); impl ChunkId { pub fn next(&self) -> ChunkId { diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 93f024e0d4f0..0255b589918e 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -1378,6 +1378,8 @@ impl ZettaCache { let fut = match index.log.lookup_by_key(key, |entry| entry.key).await { None => { // key not in index + // XXX We don't really know if we read the index from disk. We + // might have hit in the index chunk cache. Same below. stat_counter = CacheMissAfterIndexRead; super_trace!("cache miss after reading index for {:?}", key); let mut state = self.state.lock_non_send().await;