Skip to content

Commit

Permalink
index chunk cache (openzfs#114)
Browse files Browse the repository at this point in the history
Add a cache of chunks of the index, at the BlockBasedLogWithSummary
layer. The big win here is being able to determine that a block is NOT
in the cache without reading from disk. This is needed primarily when
ingesting newly-written blocks, and when doing sibling block ingestion.
In both of these cases, we have to check if the blocks are already
present, but we are checking many adjacent BlockId's, whose entries
would fall in the same chunk of the index. The cache also eliminates the
need for the last_entry special case.
  • Loading branch information
ahrens authored Jan 21, 2022
1 parent 6dd9591 commit 2b03795
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 42 deletions.
101 changes: 59 additions & 42 deletions cmd/zfs_object_agent/zettacache/src/block_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::StreamExt;
use futures_core::Stream;
use lazy_static::lazy_static;
use log::*;
use lru::LruCache;
use more_asserts::*;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand All @@ -22,16 +23,24 @@ use std::ops::Add;
use std::ops::Bound::*;
use std::ops::Sub;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
use util::get_tunable;
use util::super_trace;
use util::zettacache_stats::DiskIoType;
use util::AlignedVec;
use util::From64;
use util::LockSet;

lazy_static! {
static ref ENTRIES_PER_CHUNK: usize = get_tunable("entries_per_chunk", 200);
// Note: kernel sends writes to disk in at most 256K chunks (at least with nvme driver)
static ref WRITE_AGGREGATION_SIZE: usize = get_tunable("write_aggregation_size", 256 * 1024);
// We primarily use the chunk cache to ensure that when looking up all the
// entries in an object, we need at most one read from the index. So we
// only need as many chunks in the cache as the number of objects that we
// might be processing concurrently.
static ref CHUNK_CACHE_ENTRIES: usize = get_tunable("chunk_cache_entries", 128);
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -137,16 +146,13 @@ pub struct BlockBasedLogWithSummaryPhys<T: BlockBasedLogEntry> {
this: BlockBasedLogPhys<T>,
#[serde(bound(deserialize = "T: DeserializeOwned"))]
chunk_summary: BlockBasedLogPhys<BlockBasedLogChunkSummaryEntry<T>>,
#[serde(bound(deserialize = "T: DeserializeOwned"))]
last_entry: Option<T>,
}

impl<T: BlockBasedLogEntry> Default for BlockBasedLogWithSummaryPhys<T> {
fn default() -> Self {
Self {
this: Default::default(),
chunk_summary: Default::default(),
last_entry: Default::default(),
}
}
}
Expand Down Expand Up @@ -202,7 +208,8 @@ pub struct BlockBasedLogWithSummary<T: BlockBasedLogEntry> {
this: BlockBasedLog<T>,
chunk_summary: BlockBasedLog<BlockBasedLogChunkSummaryEntry<T>>,
chunks: Vec<BlockBasedLogChunkSummaryEntry<T>>,
last_entry: Option<T>,
chunk_cache: Mutex<LruCache<ChunkId, BlockBasedLogChunk<T>>>,
chunk_reads: LockSet<ChunkId>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -423,7 +430,8 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
this: BlockBasedLog::open(block_access.clone(), extent_allocator.clone(), phys.this),
chunk_summary,
chunks,
last_entry: phys.last_entry,
chunk_cache: Mutex::new(LruCache::new(*CHUNK_CACHE_ENTRIES)),
chunk_reads: Default::default(),
}
}

Expand Down Expand Up @@ -451,7 +459,6 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
BlockBasedLogWithSummaryPhys {
this: new_this,
chunk_summary: new_chunk_summary,
last_entry: self.last_entry,
}
}

Expand All @@ -463,7 +470,6 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
BlockBasedLogWithSummaryPhys {
this: self.this.phys.clone(),
chunk_summary: self.chunk_summary.phys.clone(),
last_entry: self.last_entry,
}
}

Expand All @@ -482,12 +488,10 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
}

pub fn append(&mut self, entry: T) {
self.last_entry = Some(entry);
self.this.append(entry);
}

pub fn clear(&mut self) {
self.last_entry = None;
self.this.clear();
self.chunk_summary.clear();
}
Expand All @@ -498,7 +502,8 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
}

/// Returns the exact location/size of this chunk (not the whole contiguous extent)
fn chunk_extent(&self, chunk_id: usize) -> Extent {
fn chunk_extent(&self, chunk_id: ChunkId) -> Extent {
let chunk_id = usize::from64(chunk_id.0);
let chunk_summary = self.chunks[chunk_id];
let chunk_size = if chunk_id == self.chunks.len() - 1 {
self.this.phys.next_chunk_offset - chunk_summary.offset
Expand All @@ -523,40 +528,46 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
{
assert_eq!(ChunkId(self.chunks.len() as u64), self.this.phys.next_chunk);

// Check if the key is after the last entry.
// XXX Note that this won't be very useful when there are multiple
// pools. When doing writes to all but the "last" pool, this check will
// fail, and we'll have to read from the index for every write. We
// could address this by having one index (BlockBasedLogWithSummary) per
// pool, or by replacing the last_entry optimization with a small cache
// of BlockBasedLogChunk's.
match self.last_entry {
Some(last_entry) => {
if key > &f(&last_entry) {
assert_gt!(key, &f(&self.chunks.last().unwrap().first_entry));
return None;
}
}
None => {
assert!(self.chunks.is_empty());
return None;
}
}

// Find the chunk_id that this key belongs in.
let chunk_id = match self
.chunks
.binary_search_by_key(key, |chunk_summary| f(&chunk_summary.first_entry))
{
Ok(index) => index,
Ok(index) => ChunkId(index as u64),
Err(index) if index == 0 => return None, // key is before the first chunk, therefore not present
Err(index) => index - 1,
Err(index) => ChunkId(index as u64 - 1),
};

if let Some(chunk) = self.chunk_cache.lock().unwrap().get(&chunk_id) {
super_trace!("found {:?} in cache", chunk_id);
// found in cache
// Search within this chunk.
return chunk
.entries
.binary_search_by_key(key, f)
.ok()
.map(|index| chunk.entries[index]);
}

// Lock the chunk so that only one thread reads it
let _guard = self.chunk_reads.lock(chunk_id).await;

// Check again in case another thread already read it
if let Some(chunk) = self.chunk_cache.lock().unwrap().get(&chunk_id) {
super_trace!("found {:?} in cache after waiting for lock", chunk_id);
// found in cache
// Search within this chunk.
return chunk
.entries
.binary_search_by_key(key, f)
.ok()
.map(|index| chunk.entries[index]);
}

// Read the chunk from disk.
let chunk_extent = self.chunk_extent(chunk_id);
super_trace!(
"reading log chunk {} at {:?} to lookup {:?}",
"reading {:?} at {:?} to lookup {:?}",
chunk_id,
chunk_extent,
key
Expand All @@ -568,16 +579,20 @@ impl<T: BlockBasedLogEntry> BlockBasedLogWithSummary<T> {
.await;
let (chunk, _consumed): (BlockBasedLogChunk<T>, usize) =
self.this.block_access.chunk_from_raw(&chunk_bytes).unwrap();
assert_eq!(chunk.id, ChunkId(chunk_id as u64));

// XXX Can we assert that we are looking in the right chunk? I think
// we'd need the chunk to have the next chunk's first key as well.
assert_eq!(chunk.id, chunk_id);

// Search within this chunk.
match chunk.entries.binary_search_by_key(key, f) {
Ok(index) => Some(chunk.entries[index]),
Err(_) => None,
}
let result = chunk
.entries
.binary_search_by_key(key, f)
.ok()
.map(|index| chunk.entries[index]);

// add to cache
super_trace!("inserting {:?} to cache", chunk_id);
self.chunk_cache.lock().unwrap().put(chunk_id, chunk);

result
}

/// Entries must have been added in sorted order, according to the provided
Expand Down Expand Up @@ -630,7 +645,9 @@ impl Sub<LogOffset> for LogOffset {
}
}

#[derive(Serialize, Deserialize, Default, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
#[derive(
Serialize, Deserialize, Default, Debug, Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd,
)]
pub struct ChunkId(u64);
impl ChunkId {
pub fn next(&self) -> ChunkId {
Expand Down
2 changes: 2 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,8 @@ impl ZettaCache {
let fut = match index.log.lookup_by_key(key, |entry| entry.key).await {
None => {
// key not in index
// XXX We don't really know if we read the index from disk. We
// might have hit in the index chunk cache. Same below.
stat_counter = CacheMissAfterIndexRead;
super_trace!("cache miss after reading index for {:?}", key);
let mut state = self.state.lock_non_send().await;
Expand Down

0 comments on commit 2b03795

Please sign in to comment.