From 52e14b7511075ed5de5578effe7eadafd809be38 Mon Sep 17 00:00:00 2001 From: tabokie Date: Thu, 16 Mar 2023 09:20:10 +0800 Subject: [PATCH 1/6] add toggle Signed-off-by: tabokie --- include/rocksdb/write_buffer_manager.h | 23 ++++++++++++++++++----- memtable/write_buffer_manager.cc | 18 ++++++++++++++++-- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 33d876c3919..3c97638814a 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -56,13 +56,17 @@ class WriteBufferManager final { // when `flush_size` is triggered. By enabling this flag, the oldest mutable // memtable will be frozen instead. // + // - flush_deadline: Interval in seconds. Flush memtable immediately if its + // oldest key was written before `now - flush_deadline` and the total memory + // reaches `flush_size`. + // // - cache: if `cache` is provided, memtable memory will be charged as a // dummy entry This is useful to keep the memory sum of both memtable and // block cache under control. - explicit WriteBufferManager(size_t flush_size, - std::shared_ptr cache = {}, - float stall_ratio = 0.0, - bool flush_oldest_first = false); + explicit WriteBufferManager( + size_t flush_size, std::shared_ptr cache = {}, + float stall_ratio = 0.0, bool flush_oldest_first = false, + uint64_t flush_deadline = std::numeric_limits::max()); // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; WriteBufferManager& operator=(const WriteBufferManager&) = delete; @@ -97,6 +101,14 @@ class WriteBufferManager final { void SetFlushSize(size_t new_size); + void SetFlushOldestFirst(bool v) { + flush_oldest_first_.store(v, std::memory_order_relaxed); + } + + void SetDeadline(uint64_t deadline) { + flush_deadline_.store(deadline, std::memory_order_relaxed); + } + // Below functions should be called by RocksDB internally. // This handle is the same as the one created by `DB::Open` or @@ -191,7 +203,8 @@ class WriteBufferManager final { std::atomic flush_size_; // Only used when flush_size is non-zero. std::atomic memory_active_; - const bool flush_oldest_first_; + std::atomic flush_oldest_first_; + std::atomic flush_deadline_; const bool allow_stall_; const float stall_ratio_; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 3c3a09d6a0c..b2c0f64ae91 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -20,11 +20,13 @@ namespace ROCKSDB_NAMESPACE { WriteBufferManager::WriteBufferManager(size_t _flush_size, std::shared_ptr cache, float stall_ratio, - bool flush_oldest_first) + bool flush_oldest_first, + uint64_t flush_deadline) : memory_used_(0), flush_size_(_flush_size), memory_active_(0), flush_oldest_first_(flush_oldest_first), + flush_deadline_(flush_deadline), allow_stall_(stall_ratio >= 1.0), stall_ratio_(stall_ratio), stall_active_(false), @@ -184,12 +186,24 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { uint64_t candidate_size = 0; uint64_t max_score = 0; uint64_t current_score = 0; + + uint64_t deadline_interval = flush_deadline_.load(std::memory_order_relaxed); + uint64_t deadline_time = 0; + if (deadline_interval != std::numeric_limits::max()) { + uint64_t current; + SystemClock::Default()->GetCurrentTime(¤t); + deadline_time = current - deadline_interval; + } for (auto& s : sentinels_) { uint64_t current_memory_bytes = std::numeric_limits::max(); uint64_t oldest_time = std::numeric_limits::max(); s->db->GetApproximateActiveMemTableStats(s->cf, ¤t_memory_bytes, &oldest_time); - if (flush_oldest_first_) { + if (oldest_time < deadline_time) { + candidate = s.get(); + candidate_size = current_memory_bytes; + break; + } else if (flush_oldest_first_.load(std::memory_order_relaxed)) { // Convert oldest to highest score. current_score = std::numeric_limits::max() - oldest_time; } else { From 2c0007fde932589bddc22c36d0e87bb1ae11d162 Mon Sep 17 00:00:00 2001 From: tabokie Date: Mon, 3 Apr 2023 11:17:09 +0800 Subject: [PATCH 2/6] protect underflow Signed-off-by: tabokie --- memtable/write_buffer_manager.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index b2c0f64ae91..b6c6da30d20 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -192,7 +192,9 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { if (deadline_interval != std::numeric_limits::max()) { uint64_t current; SystemClock::Default()->GetCurrentTime(¤t); - deadline_time = current - deadline_interval; + if (current > deadline_interval) { + deadline_time = current - deadline_interval; + } } for (auto& s : sentinels_) { uint64_t current_memory_bytes = std::numeric_limits::max(); From 8a608c6bbd24a1880a0aad57934ca9c9fc1dc6d2 Mon Sep 17 00:00:00 2001 From: tabokie Date: Mon, 3 Apr 2023 12:16:19 +0800 Subject: [PATCH 3/6] fix build Signed-off-by: tabokie --- memtable/write_buffer_manager.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index b6c6da30d20..dbf447e79a4 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -190,10 +190,13 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { uint64_t deadline_interval = flush_deadline_.load(std::memory_order_relaxed); uint64_t deadline_time = 0; if (deadline_interval != std::numeric_limits::max()) { - uint64_t current; - SystemClock::Default()->GetCurrentTime(¤t); - if (current > deadline_interval) { - deadline_time = current - deadline_interval; + int64_t current; + auto s = SystemClock::Default()->GetCurrentTime(¤t); + if (s.ok()) { + assert(current > 0); + if (static_cast(current) > deadline_interval) { + deadline_time = static_cast(current) - deadline_interval; + } } } for (auto& s : sentinels_) { From e63eeb0e98ad0718cba04a87530f45060ea0e408 Mon Sep 17 00:00:00 2001 From: tabokie Date: Wed, 19 Apr 2023 10:01:06 +0800 Subject: [PATCH 4/6] remove deadline and add penalty for l0 files Signed-off-by: tabokie --- include/rocksdb/write_buffer_manager.h | 17 +++---------- memtable/write_buffer_manager.cc | 34 +++++++++++--------------- 2 files changed, 18 insertions(+), 33 deletions(-) diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 3c97638814a..f2b93d8aa55 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -56,17 +56,13 @@ class WriteBufferManager final { // when `flush_size` is triggered. By enabling this flag, the oldest mutable // memtable will be frozen instead. // - // - flush_deadline: Interval in seconds. Flush memtable immediately if its - // oldest key was written before `now - flush_deadline` and the total memory - // reaches `flush_size`. - // // - cache: if `cache` is provided, memtable memory will be charged as a // dummy entry This is useful to keep the memory sum of both memtable and // block cache under control. - explicit WriteBufferManager( - size_t flush_size, std::shared_ptr cache = {}, - float stall_ratio = 0.0, bool flush_oldest_first = false, - uint64_t flush_deadline = std::numeric_limits::max()); + explicit WriteBufferManager(size_t flush_size, + std::shared_ptr cache = {}, + float stall_ratio = 0.0, + bool flush_oldest_first = false); // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; WriteBufferManager& operator=(const WriteBufferManager&) = delete; @@ -105,10 +101,6 @@ class WriteBufferManager final { flush_oldest_first_.store(v, std::memory_order_relaxed); } - void SetDeadline(uint64_t deadline) { - flush_deadline_.store(deadline, std::memory_order_relaxed); - } - // Below functions should be called by RocksDB internally. // This handle is the same as the one created by `DB::Open` or @@ -204,7 +196,6 @@ class WriteBufferManager final { // Only used when flush_size is non-zero. std::atomic memory_active_; std::atomic flush_oldest_first_; - std::atomic flush_deadline_; const bool allow_stall_; const float stall_ratio_; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index dbf447e79a4..5242b8d5d07 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -20,13 +20,11 @@ namespace ROCKSDB_NAMESPACE { WriteBufferManager::WriteBufferManager(size_t _flush_size, std::shared_ptr cache, float stall_ratio, - bool flush_oldest_first, - uint64_t flush_deadline) + bool flush_oldest_first) : memory_used_(0), flush_size_(_flush_size), memory_active_(0), flush_oldest_first_(flush_oldest_first), - flush_deadline_(flush_deadline), allow_stall_(stall_ratio >= 1.0), stall_ratio_(stall_ratio), stall_active_(false), @@ -187,33 +185,29 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { uint64_t max_score = 0; uint64_t current_score = 0; - uint64_t deadline_interval = flush_deadline_.load(std::memory_order_relaxed); - uint64_t deadline_time = 0; - if (deadline_interval != std::numeric_limits::max()) { - int64_t current; - auto s = SystemClock::Default()->GetCurrentTime(¤t); - if (s.ok()) { - assert(current > 0); - if (static_cast(current) > deadline_interval) { - deadline_time = static_cast(current) - deadline_interval; - } - } - } for (auto& s : sentinels_) { + // TODO: move this calculation to a callback. uint64_t current_memory_bytes = std::numeric_limits::max(); uint64_t oldest_time = std::numeric_limits::max(); s->db->GetApproximateActiveMemTableStats(s->cf, ¤t_memory_bytes, &oldest_time); - if (oldest_time < deadline_time) { - candidate = s.get(); - candidate_size = current_memory_bytes; - break; - } else if (flush_oldest_first_.load(std::memory_order_relaxed)) { + if (flush_oldest_first_.load(std::memory_order_relaxed)) { // Convert oldest to highest score. current_score = std::numeric_limits::max() - oldest_time; } else { current_score = current_memory_bytes; } + // A very mild penalty for too many L0 files. + uint64_t level0; + if (s->db->GetProperty(kNumFilesAtLevelPrefix + "0", &level0).ok() && + level0 >= 4) { + // 4->2, 5->4, 6->8, 7->12, 8->18 + uint64_t factor = (level0 - 2) * (level0 - 2) / 2; + if (factor > 100) { + factor = 100; + } + current_score = current_score * (100 - factor) / factor; + } if (current_score > max_score) { candidate = s.get(); max_score = current_score; From bbb0c2a6ca8e51bbe510e443c47da883aed12f22 Mon Sep 17 00:00:00 2001 From: tabokie Date: Wed, 19 Apr 2023 10:17:57 +0800 Subject: [PATCH 5/6] fix build Signed-off-by: tabokie --- memtable/write_buffer_manager.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 5242b8d5d07..b29bb25c568 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -199,7 +199,8 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { } // A very mild penalty for too many L0 files. uint64_t level0; - if (s->db->GetProperty(kNumFilesAtLevelPrefix + "0", &level0).ok() && + if (s->db->GetIntProperty(DB::Properties::kNumFilesAtLevelPrefix + "0", + &level0) && level0 >= 4) { // 4->2, 5->4, 6->8, 7->12, 8->18 uint64_t factor = (level0 - 2) * (level0 - 2) / 2; From 3039584b390d1ddc0f01ada7d7050ab5b9fd637f Mon Sep 17 00:00:00 2001 From: tabokie Date: Thu, 20 Apr 2023 11:13:21 +0800 Subject: [PATCH 6/6] consider compaction trigger Signed-off-by: tabokie --- memtable/write_buffer_manager.cc | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index b29bb25c568..2fec88dd9aa 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -199,15 +199,23 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { } // A very mild penalty for too many L0 files. uint64_t level0; + // 3 is to optimize the frequency of getting options, which uses mutex. if (s->db->GetIntProperty(DB::Properties::kNumFilesAtLevelPrefix + "0", &level0) && - level0 >= 4) { - // 4->2, 5->4, 6->8, 7->12, 8->18 - uint64_t factor = (level0 - 2) * (level0 - 2) / 2; - if (factor > 100) { - factor = 100; + level0 >= 3) { + auto opts = s->db->GetOptions(s->cf); + if (opts.level0_file_num_compaction_trigger > 0 && + level0 >= + static_cast(opts.level0_file_num_compaction_trigger)) { + auto diff = level0 - static_cast( + opts.level0_file_num_compaction_trigger); + // 0->2, +1->4, +2->8, +3->12, +4->18 + uint64_t factor = (diff + 2) * (diff + 2) / 2; + if (factor > 100) { + factor = 100; + } + current_score = current_score * (100 - factor) / factor; } - current_score = current_score * (100 - factor) / factor; } if (current_score > max_score) { candidate = s.get();