Skip to content

Commit

Permalink
rewrite: optimize the interval of sync when rewriting memtables. (t…
Browse files Browse the repository at this point in the history
…ikv#347)

In a cloud environment, refraining from unscheduling sync operations
when rewriting memtables might result in an accumulation of unsynced bytes
in the buffer. This accumulation has the potential to impede the foreground
write progress during sync.

This pull request introduces periodic sync operations when the amount of
stashed unsynced bytes exceeds a predefined threshold. This optimization
aims to address the issue and enhance performance.

Signed-off-by: lucasliang <[email protected]>
  • Loading branch information
LykxSassinator authored Jan 4, 2024
1 parent d043b4a commit e505d63
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ fn max_batch_bytes() -> usize {
128 * 1024
}

fn max_forcely_sync_bytes() -> usize {
max_batch_bytes() * 4
}

pub struct PurgeManager<P>
where
P: PipeLog,
Expand Down Expand Up @@ -354,6 +358,7 @@ where
let mut current_entry_indexes = Vec::new();
let mut current_entries = Vec::new();
let mut current_size = 0;
let mut unsynced_size = 0;
// Split the entries into smaller chunks, so that we don't OOM, and the
// compression overhead is not too high.
let mut entry_indexes = entry_indexes.into_iter().peekable();
Expand All @@ -362,6 +367,7 @@ where
current_size += entry.len();
current_entries.push(entry);
current_entry_indexes.push(ei);
unsynced_size += current_size;
// If this is the last entry, we handle them outside the loop.
if entry_indexes.peek().is_some()
&& current_size + previous_size > max_batch_bytes()
Expand Down Expand Up @@ -396,7 +402,15 @@ where
)?;
current_size = 0;
previous_size = 0;
let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
let sync = if unsynced_size >= max_forcely_sync_bytes() {
// Avoiding too many unsynced size can make the later `fdatasync` in
// the append progress blocked for too long.
unsynced_size = 0;
true
} else {
false
};
let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap();
if needs_atomicity && atomic_group_start.is_none() {
atomic_group_start = Some(handle.id.seq);
}
Expand Down

0 comments on commit e505d63

Please sign in to comment.