Skip to content

Commit

Permalink
zettacache locking (openzfs#401)
Browse files Browse the repository at this point in the history
Because we can have concurrent "read block" requests for the same block, we can also have concurrent accesses to the same block of the zettacache. In particular, we may have 2 "read block"s of the same block, each of which will lookup() in the zettacache, find the entry not there, and then insert() to the zettacache. This can lead to several problems, especially in subtle cases like where one of the insert()'s has been flushed to the index before the 2nd insert() is called.

This PR solves this issue by adding per-block locking to the zettacache. A failed lookup() will return with the entry locked, blocking concurrent lookup()'s of the same block until the first thread either insert()'s the value or drop()'s the LockedKey.
  • Loading branch information
ahrens authored Aug 17, 2021
1 parent e0cde5b commit c128ddf
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/zettacache/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::zettacache::AtimeHistogramPhys;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct IndexKey {
pub guid: PoolGuid,
pub block: BlockId,
Expand Down
2 changes: 2 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ mod block_allocator;
mod block_based_log;
mod extent_allocator;
mod index;
mod lock_set;
mod range_tree;
mod space_map;
mod tunable;
mod zettacache;

pub use tunable::get_tunable;
pub use tunable::read_tunable_config;
pub use zettacache::LookupResponse;
pub use zettacache::ZettaCache;
71 changes: 71 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/lock_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use log::*;
use std::fmt::Debug;
use std::{
collections::{hash_map, HashMap},
hash::Hash,
sync::{Arc, Mutex},
};
use tokio::sync::watch;

#[derive(Default, Debug, Clone)]
pub struct LockSet<V: Hash + Eq + Copy + Debug> {
locks: Arc<Mutex<HashMap<V, watch::Receiver<()>>>>,
}

pub struct LockedItem<V: Hash + Eq + Copy + Debug> {
value: V,
tx: watch::Sender<()>,
set: LockSet<V>,
}

impl<V: Hash + Eq + Copy + Debug> Drop for LockedItem<V> {
fn drop(&mut self) {
trace!("{:?}: removing lock", self.value);
let rx = self.set.locks.lock().unwrap().remove(&self.value);
assert!(rx.is_some());
// This unwrap can't fail because there is still a receiver, `rx`.
self.tx.send(()).unwrap();
}
}

impl<V: Hash + Eq + Copy + Debug> LockedItem<V> {
pub fn value(&self) -> &V {
&self.value
}
}

impl<V: Hash + Eq + Copy + Debug> LockSet<V> {
pub fn new() -> Self {
Self {
locks: Default::default(),
}
}

pub async fn lock(&self, value: V) -> LockedItem<V> {
let tx = loop {
let mut rx = {
match self.locks.lock().unwrap().entry(value) {
hash_map::Entry::Occupied(oe) => oe.get().clone(),
hash_map::Entry::Vacant(ve) => {
let (tx, rx) = watch::channel(());
ve.insert(rx);
break tx;
}
}
};
trace!("{:?}: waiting for existing lock", value);
// Note: since we don't hold the locks mutex now, the corresponding
// LockedItem may have been dropped, in which case the sender was
// dropped. In this case, the changed() Result will be an Err,
// which we ignore with ok().
rx.changed().await.ok();
};
trace!("{:?}: inserted new lock", value);

LockedItem {
value,
tx,
set: self.clone(),
}
}
}
38 changes: 27 additions & 11 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::extent_allocator::ExtentAllocator;
use crate::extent_allocator::ExtentAllocatorPhys;
use crate::get_tunable;
use crate::index::*;
use crate::lock_set::LockSet;
use crate::lock_set::LockedItem;
use anyhow::Result;
use futures::future;
use futures::stream::*;
Expand Down Expand Up @@ -111,6 +113,7 @@ pub struct ZettaCache {
index: Arc<tokio::sync::RwLock<ZettaCacheIndex>>,
// XXX may need to break up this big lock. At least we aren't holding it while doing i/o
state: Arc<tokio::sync::Mutex<ZettaCacheState>>,
outstanding_lookups: LockSet<IndexKey>,
metrics: Arc<ZettaCacheMetrics>,
}

Expand Down Expand Up @@ -234,11 +237,20 @@ struct ZettaCacheState {
// from the tree. These are needed to prevent the ExtentAllocator from
// overwriting them while i/o is in flight, and to ensure that writes
// complete before we complete the next checkpoint.
// XXX I don't think we do lookups here so these could be Vec's?
outstanding_reads: BTreeMap<IndexValue, Arc<Semaphore>>,
outstanding_writes: BTreeMap<IndexValue, Arc<Semaphore>>,

atime: Atime,
}

pub struct LockedKey(LockedItem<IndexKey>);

pub enum LookupResponse {
Present(Vec<u8>),
Absent(LockedKey),
}

#[metered(registry=ZettaCacheMetrics)]
impl ZettaCache {
pub async fn create(path: &str) {
Expand Down Expand Up @@ -345,8 +357,8 @@ impl ZettaCache {
),
operation_log,
super_phys: phys,
outstanding_reads: BTreeMap::new(),
outstanding_writes: BTreeMap::new(),
outstanding_reads: Default::default(),
outstanding_writes: Default::default(),
atime: checkpoint.last_atime,
block_allocator: BlockAllocator::open(
block_access.clone(),
Expand All @@ -360,6 +372,7 @@ impl ZettaCache {
let this = ZettaCache {
index: Arc::new(tokio::sync::RwLock::new(index)),
state: Arc::new(tokio::sync::Mutex::new(state)),
outstanding_lookups: LockSet::new(),
metrics: Default::default(),
};

Expand Down Expand Up @@ -512,16 +525,18 @@ impl ZettaCache {
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
pub async fn lookup(&self, guid: PoolGuid, block: BlockId) -> Option<Vec<u8>> {
pub async fn lookup(&self, guid: PoolGuid, block: BlockId) -> LookupResponse {
// We want to hold the index lock over the whole operation so that the index can't change after we get the value from it.
// Lock ordering requres that we lock the index before locking the state.
let index = self.index.read().await;
let key = IndexKey { guid, block };
let locked_key = LockedKey(self.outstanding_lookups.lock(key).await);
let index = self.index.read().await;
let read_data_fut_opt = {
// We don't want to hold the state lock while reading from disk. We
// use lock_state_non_async() to ensure that we can't hold it across
// .await.
let mut state = self.lock_state_non_async().await;

match state.pending_changes.get(&key).copied() {
Some(pc) => {
match pc {
Expand All @@ -546,11 +561,11 @@ impl ZettaCache {
match read_data_fut.await {
Some(vec) => {
self.cache_hit_without_index_read(&key);
return Some(vec);
return LookupResponse::Present(vec);
}
None => {
self.cache_miss_without_index_read(&key);
return None;
return LookupResponse::Absent(locked_key);
}
}
}
Expand All @@ -561,7 +576,7 @@ impl ZettaCache {
None => {
// key not in index
self.cache_miss_after_index_read(&key);
None
LookupResponse::Absent(locked_key)
}
Some(entry) => {
// read data from location indicated by index
Expand All @@ -576,11 +591,11 @@ impl ZettaCache {
match read_data_fut.await {
Some(vec) => {
self.cache_hit_after_index_read(&key);
Some(vec)
LookupResponse::Present(vec)
}
None => {
self.cache_miss_after_index_read(&key);
None
LookupResponse::Absent(locked_key)
}
}
}
Expand All @@ -591,9 +606,10 @@ impl ZettaCache {
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
pub async fn insert(&self, guid: PoolGuid, block: BlockId, buf: Vec<u8>) {
pub async fn insert(&self, locked_key: LockedKey, buf: Vec<u8>) {
let mut state = self.state.lock().await;
state.insert(guid, block, buf);
let index_key = locked_key.0.value();
state.insert(index_key.guid, index_key.block, buf);
}
}

Expand Down
22 changes: 14 additions & 8 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tokio::time::sleep;
use uuid::Uuid;
use zettacache::base_types::*;
use zettacache::get_tunable;
use zettacache::LookupResponse;
use zettacache::ZettaCache;

lazy_static! {
Expand Down Expand Up @@ -1375,11 +1376,13 @@ impl Pool {

pub async fn read_block(&self, block: BlockId) -> Vec<u8> {
// check in ZettaCache
if let Some(cache) = &self.state.zettacache {
if let Some(v) = cache.lookup(self.state.shared_state.guid, block).await {
return v;
}
}
let key = match &self.state.zettacache {
Some(cache) => match cache.lookup(self.state.shared_state.guid, block).await {
LookupResponse::Present(v) => return v,
LookupResponse::Absent(l) => Some(l),
},
None => None,
};

let object = self.state.object_block_map.block_to_object(block);
let shared_state = self.state.shared_state.clone();
Expand All @@ -1398,10 +1401,13 @@ impl Pool {
let v = phys.blocks.get(&block).unwrap().to_owned().into_vec();

// add to ZettaCache
if let Some(cache) = &self.state.zettacache {
if let Some(key) = key {
// XXX clone() copies the data; would be nice to pass a reference
cache
.insert(self.state.shared_state.guid, block, v.clone())
self.state
.zettacache
.as_ref()
.unwrap()
.insert(key, v.clone())
.await;
}

Expand Down

0 comments on commit c128ddf

Please sign in to comment.