From d58469e967b611e8489f87d8792c9124541e9f74 Mon Sep 17 00:00:00 2001 From: lucasliang Date: Thu, 4 Jan 2024 17:27:27 +0800 Subject: [PATCH] rewrite: optimize the interval of `sync` when rewriting memtables. (#347) In a cloud environment, refraining from unscheduling sync operations when rewriting memtables might result in an accumulation of unsynced bytes in the buffer. This accumulation has the potential to impede the foreground write progress during sync. This pull request introduces periodic sync operations when the amount of stashed unsynced bytes exceeds a predefined threshold. This optimization aims to address the issue and enhance performance. Signed-off-by: lucasliang --- src/purge.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/purge.rs b/src/purge.rs index b1183438..e96c5759 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -35,6 +35,10 @@ fn max_batch_bytes() -> usize { 128 * 1024 } +fn max_forcely_sync_bytes() -> usize { + max_batch_bytes() * 4 +} + pub struct PurgeManager

where P: PipeLog, @@ -354,6 +358,7 @@ where let mut current_entry_indexes = Vec::new(); let mut current_entries = Vec::new(); let mut current_size = 0; + let mut unsynced_size = 0; // Split the entries into smaller chunks, so that we don't OOM, and the // compression overhead is not too high. let mut entry_indexes = entry_indexes.into_iter().peekable(); @@ -362,6 +367,7 @@ where current_size += entry.len(); current_entries.push(entry); current_entry_indexes.push(ei); + unsynced_size += current_size; // If this is the last entry, we handle them outside the loop. if entry_indexes.peek().is_some() && current_size + previous_size > max_batch_bytes() @@ -396,7 +402,15 @@ where )?; current_size = 0; previous_size = 0; - let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap(); + let sync = if unsynced_size >= max_forcely_sync_bytes() { + // Avoiding too many unsynced size can make the later `fdatasync` in + // the append progress blocked for too long. + unsynced_size = 0; + true + } else { + false + }; + let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap(); if needs_atomicity && atomic_group_start.is_none() { atomic_group_start = Some(handle.id.seq); }