Skip to content

Commit

Permalink
batching to improve zettacache ingestion performance (openzfs#795)
Browse files Browse the repository at this point in the history
Blocks are ingested to the zettacache in 2 cases:
* when writing a new object, we add all its blocks to the zettacache
* when reading a block which is not in the zettacache, we get the object
  and add all its (not-already-present) blocks to the zettacache

In both cases, we are adding many blocks (up to ~300) that are part of
the same object.  The current code mostly handles each block
individually, causing repeated work, especially to lock and unlock
various data structures.

This commit streamlines the batch insertion of all the
(not-already-present) blocks in object.  There are 3 main aspects to
this:
* bulk lookup: `zettacache::Inner::lookup_all_impl()` takes a list of
  keys and executes a callback for each of them, providing the
  IndexValue.  The Locked lock is obtained at most twice.
* bulk insert: `zettacache::Inner::insert_all_impl()` takes a list of
  keys and buffers, and writes all of them to disk.  The Locked lock is
  obtained once.
* bulk LockedKey: the new `RangeLock` is used to lock the range of keys
  covered by the object.  Only one lock is obtained for the object,
  instead of one lock from the LockSet for each block.

The performance of ingesting via reading random blocks is improved by
100-200% (performance is 2-3x what is was before).
  • Loading branch information
ahrens authored Apr 14, 2023
1 parent cb4a904 commit 2cb25ba
Show file tree
Hide file tree
Showing 10 changed files with 628 additions and 168 deletions.
9 changes: 9 additions & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/zfs_object_agent/util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ derivative = "2.2.0"
enum-map = { version = "2.5.0", features = ["serde"] }
futures = "0.3.26"
humantime = "2.1.0"
itertools = "0.10.5"
iset = "0.2.2"
lazy_static = "1.4.0"
log = "0.4"
log4rs = "1.2.0"
Expand Down
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod measure;
pub mod message;
mod nicenum;
pub mod range_filter;
pub mod range_lock;
mod range_tree;
pub mod semaphore_ext;
pub mod sequential;
Expand Down
333 changes: 333 additions & 0 deletions cmd/zfs_object_agent/util/src/range_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
use std::sync::Mutex;

use derivative::Derivative;
use iset::IntervalMap;
use tokio::sync::Notify;

use crate::super_trace;

#[derive(Derivative)]
#[derivative(Default(bound = ""))]
pub struct RangeLock<V: PartialOrd + Copy> {
inner: Mutex<Inner<V>>,
}

#[derive(Derivative)]
#[derivative(Default(bound = ""))]
struct Inner<V: PartialOrd + Copy> {
intervals: IntervalMap<V, LockedRange>,
}

/// The RangeLock allows locking of ranges of values, allowing exclusive access to the range.
/// Ranges are half-open [start, end): end is excluded, like the std::ops::Range type.
///
/// The implementation is based on a Range Tree, so operations are O(M + log N), where M is the
/// number of ranges that conflict (overlap) with the lock in question, and N is the total number
/// of ranges active (locked or waiting). The lock is fair: when ranges conflict, they will be
/// acquired in the order requested, and starvation is avoided. However, to achieve this a range
/// will wait when there are overlapping ranges waiting, even if the requested range is not
/// currently locked.
///
/// See also `LockSet`, which allows locking of single values.
#[derive(Derivative)]
#[derivative(Debug)]
struct LockedRange {
/// If zero, this range is locked. Otherwise we are waiting for this many overlapping ranges
/// to unlock before it is our turn.
waiting_on: usize,
notify: Arc<Notify>,
}

impl<V: Ord + Copy + Debug + Send> RangeLock<V> {
/// Lock the half-open range [start, end). End is excluded.
async fn lock_impl(&self, start: V, end: V) {
let range = start..end;
let notify = {
let mut inner = self.inner.lock().unwrap();

super_trace!("{range:?}: locking...");
// We need to wait for all the already-present overlapping ranges to unlock. This
// may include ranges that are still waiting (their `waiting_on` is nonzero).
let locked_range = LockedRange {
waiting_on: inner.intervals.values(start..end).count(),
notify: Arc::new(Notify::new()),
};

let notify = if locked_range.waiting_on > 0 {
super_trace!(
"{range:?}: waiting on {} locks: {:?}...",
locked_range.waiting_on,
inner.intervals.intervals(start..end).collect::<Vec<_>>()
);
Some(locked_range.notify.clone())
} else {
None
};

inner.intervals.force_insert(start..end, locked_range);
notify
};

if let Some(notify) = notify {
notify.notified().await;
super_trace!("{range:?}: ...woken up");
}

super_trace!("{range:?}: ...locked");
}

pub async fn lock_owned(self: Arc<Self>, range: Range<V>) -> OwnedRangeGuard<V> {
self.lock_impl(range.start, range.end).await;
OwnedRangeGuard {
start: range.start,
end: range.end,
range_lock: self,
}
}

pub async fn lock(&self, range: Range<V>) -> RangeGuard<'_, V> {
self.lock_impl(range.start, range.end).await;
RangeGuard {
start: range.start,
end: range.end,
range_lock: self,
}
}

fn unlock(&self, start: V, end: V) {
let mut inner = self.inner.lock().unwrap();
let range = start..end;
super_trace!("{range:?}: releasing...");
let removed = inner
.intervals
.remove_where(start..end, |locked_range| locked_range.waiting_on == 0)
.unwrap();
assert_eq!(removed.waiting_on, 0);
// Decrement the waiting_on count of all the ranges that were waiting on us, which is all
// the overlapping ranges.
for (range, locked_range) in inner.intervals.iter_mut(start..end) {
if locked_range.waiting_on == 0 {
panic!("{range:?} already at 0");
}
locked_range.waiting_on -= 1;
super_trace!("{range:?}: decremented to {}", locked_range.waiting_on);
if locked_range.waiting_on == 0 {
super_trace!("{range:?}: notifying");
locked_range.notify.notify_one();
}
}
super_trace!("{range:?}: ...released");
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct RangeGuard<'a, V: Debug + Ord + Copy + Send> {
start: V,
end: V, // exclusive
#[derivative(Debug = "ignore")]
range_lock: &'a RangeLock<V>,
}

impl<'a, V: Debug + Ord + Copy + Send> Drop for RangeGuard<'a, V> {
fn drop(&mut self) {
self.range_lock.unlock(self.start, self.end);
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct OwnedRangeGuard<V: Debug + Ord + Copy + Send> {
start: V,
end: V, // exclusive
#[derivative(Debug = "ignore")]
range_lock: Arc<RangeLock<V>>,
}

impl<V: Debug + Ord + Copy + Send> Drop for OwnedRangeGuard<V> {
fn drop(&mut self) {
self.range_lock.unlock(self.start, self.end);
}
}

#[cfg(test)]
mod test {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use rand::Rng;
use tokio::sync::Barrier;
use tokio::time::sleep;

use super::*;

#[tokio::test]
async fn nonoverlapping_singleton() {
let range_lock = RangeLock::default();
let _l1 = range_lock.lock(1..2).await;
let _l0 = range_lock.lock(0..1).await;
let _l2 = range_lock.lock(2..3).await;
}

#[tokio::test]
async fn nonoverlapping_spaced() {
let range_lock = RangeLock::default();
let _l1 = range_lock.lock(10..15).await;
let _l0 = range_lock.lock(0..5).await;
let _l2 = range_lock.lock(20..25).await;
}

#[tokio::test]
async fn nonoverlapping_large() {
let range_lock = RangeLock::default();
let _l1 = range_lock.lock(10..20).await;
let _l0 = range_lock.lock(0..10).await;
let _l2 = range_lock.lock(20..30).await;
}

async fn two(start1: i32, end1: i32, start2: i32, end2: i32) {
let range_lock = Arc::new(RangeLock::default());
let released = Arc::new(AtomicBool::new(false));

let l0 = range_lock.clone().lock_owned(start1..end1).await;

let jh = {
let range_lock = range_lock.clone();
let released = released.clone();
tokio::spawn(async move {
let _l1 = range_lock.lock(start2..end2).await;
assert!(released.load(Ordering::Relaxed));
})
};

sleep(Duration::from_millis(100)).await;

released.store(true, Ordering::Relaxed);
drop(l0);
jh.await.unwrap();
}

#[tokio::test]
async fn identical() {
two(10, 20, 10, 20).await;
}

#[tokio::test]
async fn contains() {
two(10, 20, 5, 25).await;
}

#[tokio::test]
async fn contained() {
two(10, 20, 13, 17).await;
}

#[tokio::test]
async fn begin() {
two(10, 20, 5, 15).await;
}

#[tokio::test]
async fn end() {
two(10, 20, 15, 25).await;
}

/// Test that all the `non_overlapping` ranges can be held at the same time, but they have to
/// wait for `start..end` to be released. `start..end` must overlap with all the
/// `non_overlapping` ranges, but the `non_overlapping` ranges should not overlap with
/// themselves.
async fn many(
start: i32,
end: i32,
non_overlapping: impl Iterator<Item = (i32, i32)> + ExactSizeIterator,
) {
let range_lock = Arc::new(RangeLock::default());
let released = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(non_overlapping.len()));

let l0 = range_lock.lock(start..end).await;

let jh = {
non_overlapping
.map(|(start, end)| {
let range_lock = range_lock.clone();
let released = released.clone();
let barrier = barrier.clone();
tokio::spawn(async move {
let _locked_range = range_lock.lock(start..end).await;
// The first lock should be released.
assert!(released.load(Ordering::Relaxed));
barrier.wait().await;
})
})
.collect::<Vec<_>>()
};

sleep(Duration::from_millis(100)).await;

released.store(true, Ordering::Relaxed);
drop(l0);

// All the spawned tasks should be able to acquire their locks at the same time, because
// they are non-overlapping, and then enter and exit the barrier.
for jh in jh {
jh.await.unwrap();
}
}

#[tokio::test]
async fn more() {
many(
10,
100,
[
(5, 11),
(11, 12),
(20, 22),
(30, 31),
(31, 32),
(32, 33),
(40, 45),
(45, 50),
(55, 103),
]
.into_iter(),
)
.await;
}

#[tokio::test]
async fn stress() {
let range_lock = Arc::new(RangeLock::default());
let deadline = Instant::now() + Duration::from_secs(20);

let jh = {
(0..100)
.into_iter()
.map(|_| {
let range_lock = range_lock.clone();
tokio::spawn(async move {
while Instant::now() < deadline {
let start = rand::thread_rng().gen_range(0..999);
let end = rand::thread_rng().gen_range(start + 1..1000);
let _locked_range = range_lock.lock(start..end).await;
let duration =
Duration::from_micros(rand::thread_rng().gen_range(0..100));
sleep(duration).await;
}
})
})
.collect::<Vec<_>>()
};

for jh in jh {
jh.await.unwrap();
}
}
}
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/util/src/zettacache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl IoStats {
// Stat structs for zcache stats subcommand
//

#[derive(Debug, Enum, Clone, Serialize, Deserialize)]
#[derive(Debug, Enum, Copy, Clone, Serialize, Deserialize)]
/// The stats collected for zcache stats subcommand.
pub enum CacheStatCounter {
// These stats are collected as part of the ZettaCache.stats struct.
Expand Down
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettacache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ either = "1.8.1"
enum-map = "2.5.0"
futures = "0.3.26"
futures-core = "0.3.27"
itertools = "0.10.5"
lazy_static = "1.4.0"
libc = "0.2"
log = "0.4"
Expand Down
Loading

0 comments on commit 2cb25ba

Please sign in to comment.