Skip to content

Commit

Permalink
Tune zettacache index merge channels to reduce memory usage (openzfs#639
Browse files Browse the repository at this point in the history
)

During a zettacache index merge, memory usage can increase drastically,
e.g. from 8GB to 19GB, primarily due to memory used by the
`IndexMessage`s (which are passed from the `merge_task()` to the
`next_index_task()`) and `MergeMessage`s (which is passed from the
`next_index_task()` to the `checkpoint_task()`).

IndexMessages take 62MB each, and MergeMessages take 39MB each, due to
the allocation of `MERGE_PROGRESS_CHUNK` (1 million) entries in each of
their `entries`, `frees`, and `cache_updates` Vec's.  There can be up to
100 of each of these messages in the channels, for a total of 10GB.

This commit reduces the memory usage in two ways:

Reduce the number of IndexMessages from 100 to 10.  This has little
impact on merge performance, because the next_index_task() can process
messages at a relatively constant rate.

Reduce the size of the `frees` and `cache_updates` Vec's in the
`MergeMessage` to be just big enough to contain their used entries,
which is typically very few, because there are few frees/updates
compared to the number of `entries`.

This reduces the typical maximum memory used by these messages from 10GB
to <1GB, with negligible impact on merge performance.
  • Loading branch information
ahrens authored Oct 7, 2022
1 parent 3dadc35 commit f2ceb3e
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use util::measure;
use util::nice_p2size;
use util::super_trace;
use util::tunable;
use util::with_alloctag;
use util::zettacache_stats::CacheStatCounter::*;
use util::zettacache_stats::CacheStats;

Expand All @@ -42,6 +43,20 @@ use crate::slab_allocator::SlabAllocatorBuilder;

tunable! {
static ref MERGE_PROGRESS_CHUNK: usize = 1_000_000;

// The size of the channel from merge_task() to next_index_task(). Each message in the
// channel allocates memory for MERGE_PROGRESS_CHUNK new entries, frees, and cache_updates
// (even if there are very few frees/cache_updates in this message), totaling 62MB per
// channel entry. Therefore, to keep memory usage down during a merge, this should be kept
// as low as possible while still achieving good performance of the merge.
static ref MERGE_CHANNEL_BUFFER: usize = 10;

// The size of the channel from next_index_task() to checkpoint_task(). Each message in the
// channel allocates memory for the frees and cache_updates that are actually occupied, and
// the new entries are not present at all. Therefore, this can be relatively large without
// impacting memory usage much, which allows the merge to make progress while the
// checkpoint_task() is busy writing the checkpoint and not receiving messages.
static ref INDEX_CHANNEL_BUFFER: usize = 100;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -155,14 +170,14 @@ impl Progress {
chunk_len,
tx,
last_key: None,
entries: Vec::with_capacity(chunk_len),
entries: with_alloctag("merge entries", || Vec::with_capacity(chunk_len)),
entries_atimes: AtimeHistogramPhys::with_capacity(
first_ghost,
first_live,
histogram_len,
),
frees: Vec::with_capacity(chunk_len),
cache_updates: Vec::with_capacity(chunk_len),
frees: with_alloctag("merge frees", || Vec::with_capacity(chunk_len)),
cache_updates: with_alloctag("merge cache_updates", || Vec::with_capacity(chunk_len)),
obsoleted: AtimeHistogramPhys::with_capacity(first_ghost, first_live, histogram_len),
timer: Instant::now(),
}
Expand Down Expand Up @@ -259,13 +274,16 @@ impl Progress {
measure!("Progress::report() tx.send(IndexMessage)")
.fut_timed(self.tx.send(IndexMessage {
last_key,
entries: mem::replace(&mut self.entries, Vec::with_capacity(self.chunk_len)),
entries: with_alloctag("merge entries", || {
mem::replace(&mut self.entries, Vec::with_capacity(self.chunk_len))
}),
entries_atimes: self.entries_atimes.take(),
frees: mem::replace(&mut self.frees, Vec::with_capacity(self.chunk_len)),
cache_updates: mem::replace(
&mut self.cache_updates,
Vec::with_capacity(self.chunk_len),
),
frees: with_alloctag("merge frees", || {
mem::replace(&mut self.frees, Vec::with_capacity(self.chunk_len))
}),
cache_updates: with_alloctag("merge cache_updates", || {
mem::replace(&mut self.cache_updates, Vec::with_capacity(self.chunk_len))
}),
obsoleted_atimes: self.obsoleted.take(),
}))
.await
Expand Down Expand Up @@ -311,21 +329,28 @@ impl MergeState {
next_index: &mut IndexRun,
) {
let begin = Instant::now();
while let Some(message) = merge_rx.recv().await {
while let Some(mut message) = merge_rx.recv().await {
next_index.append(message.entries, &message.entries_atimes);
// The "last key" from the appended entries may not be the last key we actually
// processed in the merge (e.g, we may have evicted some entries later)
next_index.update_last_key(message.last_key);
checkpoint_tx
.send(
MergeMessage::new_progress(
next_index,
message.frees,
message.cache_updates,
message.obsoleted_atimes,
)
.await,
)

// These Vec's were allocated with MERGE_PROGRESS_CHUNK capacity, which is likely
// much more than was needed. Note that shrinking might require a bcopy, so we wait
// until we're out of the critical path of merge_task() to do it. See also the
// comments for MERGE_CHANNEL_BUFFER and INDEX_CHANNEL_BUFFER.
message.frees.shrink_to_fit();
message.cache_updates.shrink_to_fit();

let merge_message = MergeMessage::new_progress(
next_index,
message.frees,
message.cache_updates,
message.obsoleted_atimes,
)
.await;
measure!("MergeState::next_index_task() tx.send(MergeMessage)")
.fut_timed(checkpoint_tx.send(merge_message))
.await
.unwrap_or_else(|e| panic!("couldn't send: {}", e));
}
Expand Down Expand Up @@ -506,8 +531,8 @@ impl MergeState {
// The checkpoint task will be constantly reading from the channel, so we don't really
// need much of a buffer here. We use 100 because we might accumulate some messages while
// actually flushing out the checkpoint.
let (index_tx, checkpoint_rx) = mpsc::channel(100);
let (merge_tx, index_rx) = mpsc::channel(100);
let (merge_tx, index_rx) = mpsc::channel(*MERGE_CHANNEL_BUFFER);
let (index_tx, checkpoint_rx) = mpsc::channel(*INDEX_CHANNEL_BUFFER);

let start_key = next_index.last_key();

Expand Down

0 comments on commit f2ceb3e

Please sign in to comment.