From 7d31f9ec05212045608aa6b9b35352a860dbf3da Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Sun, 14 Apr 2024 16:51:14 +0300 Subject: [PATCH 1/4] WBM: Fix stall deadlock with multiple cfs With a setting of multiple cfs and WriteBufferManager with allow_stall, the DB can enter a deadlock when the WBM initiates a stall. This happens since only the oldest cf is picked for flush when HandleWriteBufferManagerFlush is called to flush the data and prevent the stall. When using multiple CFs, this does not ensure the FreeMem will evict enough memory to prevent a stall and no other flush is scheduled. To fix this, add cfs to the flush queue so that we'll be below the mutable_limit_. --- db/db_impl/db_impl_write.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index f6090e341f..d63ab3fbda 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1747,9 +1747,9 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { if (immutable_db_options_.atomic_flush) { SelectColumnFamiliesForAtomicFlush(&cfds); } else { - ColumnFamilyData* cfd_picked = nullptr; - SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; - + int64_t total_mem_to_free = + write_buffer_manager()->mutable_memtable_memory_usage() - + write_buffer_manager()->buffer_size() * 7 / 8; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; @@ -1759,16 +1759,15 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { // and no immutable memtables for which flush has yet to finish. If // we triggered flush on CFs already trying to flush, we would risk // creating too many immutable memtables leading to write stalls. - uint64_t seq = cfd->mem()->GetCreationSeq(); - if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { - cfd_picked = cfd; - seq_num_for_cf_picked = seq; + auto mem_used = cfd->mem()->ApproximateMemoryUsageFast(); + cfds.push_back(cfd); + total_mem_to_free -= mem_used; + if (total_mem_to_free <= 0) { + break; } } } - if (cfd_picked != nullptr) { - cfds.push_back(cfd_picked); - } + MaybeFlushStatsCF(&cfds); } if (!cfds.empty()) { From e7714c48afef38c483c7c8c852361166e16dcdbc Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Sun, 14 Apr 2024 16:53:53 +0300 Subject: [PATCH 2/4] update history --- HISTORY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/HISTORY.md b/HISTORY.md index a28ceefca3..255a811d6d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ ### Bug Fixes * LOG Consistency:Display the pinning policy options same as block cache options / metadata cache options (#804). +* WBM: fix allow_stall deadlock with multiple cfs. ### Miscellaneous * WriteController logging: Remove redundant reports when WC is not shared between dbs From 9f5004faec796031a2a9e315554562f8c0222310 Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Sun, 14 Apr 2024 18:04:24 +0300 Subject: [PATCH 3/4] prettify --- db/db_impl/db_impl_write.cc | 8 ++++++-- include/rocksdb/write_buffer_manager.h | 8 +++++++- memtable/write_buffer_manager.cc | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index d63ab3fbda..ae0d68fbd4 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1747,9 +1747,13 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { if (immutable_db_options_.atomic_flush) { SelectColumnFamiliesForAtomicFlush(&cfds); } else { + // As part of https://github.com/speedb-io/speedb/pull/859, theres a need to + // schedule more flushes since the cfd picked for flush was the oldest one + // and not necessarily enough to resolve the stall issue. + // For this reason, schedule enough flushes so that the memory usage is at + // least below the flush trigger (kMutableLimit * buffer_size) int64_t total_mem_to_free = - write_buffer_manager()->mutable_memtable_memory_usage() - - write_buffer_manager()->buffer_size() * 7 / 8; + write_buffer_manager()->memory_above_flush_trigger(); for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 691280ca32..52eaf0b825 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -67,6 +67,8 @@ class WriteBufferManager final { static constexpr uint64_t kNoDelayedWriteFactor = 0U; static constexpr uint64_t kMaxDelayedWriteFactor = 100U; static constexpr uint64_t kStopDelayedWriteFactor = kMaxDelayedWriteFactor; + static constexpr double kMutableLimit = 0.875; + enum class UsageState { kNone, kDelay, kStop }; public: @@ -152,6 +154,10 @@ class WriteBufferManager final { return ((inactive >= total) ? 0 : (total - inactive)); } + int64_t memory_above_flush_trigger() { + return mutable_memtable_memory_usage() - buffer_size() * kMutableLimit; + } + // Returns the total inactive memory used by memtables. size_t immmutable_memtable_memory_usage() const { return memory_inactive_.load(std::memory_order_relaxed); @@ -180,7 +186,7 @@ class WriteBufferManager final { [[maybe_unused]] auto was_enabled = enabled(); buffer_size_.store(new_size, std::memory_order_relaxed); - mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); + mutable_limit_.store(new_size * kMutableLimit, std::memory_order_relaxed); assert(was_enabled == enabled()); diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index bdfc389798..87fb3cb656 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -54,7 +54,7 @@ WriteBufferManager::WriteBufferManager( const FlushInitiationOptions& flush_initiation_options, uint16_t start_delay_percent) : buffer_size_(_buffer_size), - mutable_limit_(buffer_size_ * 7 / 8), + mutable_limit_(buffer_size_ * kMutableLimit), memory_used_(0), memory_inactive_(0), memory_being_freed_(0U), From 3026791bbf57b1519d7fd91de6ad87ba0dc27171 Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Mon, 15 Apr 2024 12:48:47 +0300 Subject: [PATCH 4/4] use ApproximateMemoryUsage instead of ApproximateMemoryUsageFast and memory_usage() instead of mutable_memtable_memory_usage() --- db/db_impl/db_impl_write.cc | 2 +- include/rocksdb/write_buffer_manager.h | 2 +- memtable/write_buffer_manager.cc | 13 ++++++++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index ae0d68fbd4..250d38251d 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1763,7 +1763,7 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { // and no immutable memtables for which flush has yet to finish. If // we triggered flush on CFs already trying to flush, we would risk // creating too many immutable memtables leading to write stalls. - auto mem_used = cfd->mem()->ApproximateMemoryUsageFast(); + auto mem_used = cfd->mem()->ApproximateMemoryUsage(); cfds.push_back(cfd); total_mem_to_free -= mem_used; if (total_mem_to_free <= 0) { diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 52eaf0b825..d5a05bd5cf 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -155,7 +155,7 @@ class WriteBufferManager final { } int64_t memory_above_flush_trigger() { - return mutable_memtable_memory_usage() - buffer_size() * kMutableLimit; + return memory_usage() - buffer_size() * kMutableLimit; } // Returns the total inactive memory used by memtables. diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 87fb3cb656..b73cbc8616 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -112,6 +112,12 @@ void WriteBufferManager::ReserveMem(size_t mem) { memory_used_.fetch_add(mem, std::memory_order_relaxed); new_memory_used = old_memory_used + mem; } + for (auto loggers : loggers_to_client_ids_map_) { + ROCKS_LOG_WARN(loggers.first, + "WBM (%p) ReserveMem called with: %" PRIu64 + " , memory_used: %" PRIu64, + this, mem, new_memory_used); + } if (is_enabled) { UpdateUsageState(new_memory_used, static_cast(mem), buffer_size()); // Checking outside the locks is not reliable, but avoids locking @@ -177,7 +183,12 @@ void WriteBufferManager::FreeMem(size_t mem) { assert(old_memory_used >= mem); new_memory_used = old_memory_used - mem; } - + for (auto loggers : loggers_to_client_ids_map_) { + ROCKS_LOG_WARN(loggers.first, + "WBM (%p) FreeMem called with: %" PRIu64 + ", memory_used: %" PRIu64, + this, mem, new_memory_used); + } if (is_enabled) { [[maybe_unused]] const auto curr_memory_inactive = memory_inactive_.fetch_sub(mem, std::memory_order_relaxed);