Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Charge blob cache usage against the global memory limit #10206

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion cache/cache_reservation_manager.cc
Original file line number Diff line number Diff line change
@@ -43,7 +43,9 @@ CacheReservationManagerImpl<R>::CacheReservationManagerImpl(
std::shared_ptr<Cache> cache, bool delayed_decrease)
: delayed_decrease_(delayed_decrease),
cache_allocated_size_(0),
memory_used_(0) {
memory_used_(0),
blob_cache_size_(0),
dummy_blobs_handle_(nullptr) {
assert(cache != nullptr);
cache_ = cache;
}
@@ -53,6 +55,9 @@ CacheReservationManagerImpl<R>::~CacheReservationManagerImpl() {
for (auto* handle : dummy_handles_) {
cache_->Release(handle, true);
}
if (dummy_blobs_handle_ != nullptr) {
cache_->Release(dummy_blobs_handle_, true);
}
}

template <CacheEntryRole R>
@@ -85,6 +90,27 @@ Status CacheReservationManagerImpl<R>::UpdateCacheReservation(
}
}

template <CacheEntryRole R>
Status CacheReservationManagerImpl<R>::UpdateBlobCacheReservation(
std::shared_ptr<Cache> blob_cache) {
Status s = Status::OK();
if (blob_cache != cache_) {
std::size_t new_memory_used = blob_cache->GetUsage();
if (new_memory_used != blob_cache_size_.load(std::memory_order_relaxed)) {
if (dummy_blobs_handle_ != nullptr) {
cache_->Release(dummy_blobs_handle_, true);
}
s = cache_->Insert(GetNextCacheKey(), nullptr, new_memory_used,
GetNoopDeleterForRole<R>(), &dummy_blobs_handle_);
if (s != Status::OK()) {
return s;
}
blob_cache_size_ = new_memory_used;
}
}
return s;
}

template <CacheEntryRole R>
Status CacheReservationManagerImpl<R>::MakeCacheReservation(
std::size_t incremental_memory_used,
34 changes: 31 additions & 3 deletions cache/cache_reservation_manager.h
Original file line number Diff line number Diff line change
@@ -42,6 +42,8 @@ class CacheReservationManager {
// `UpdateCacheReservation` function
virtual Status UpdateCacheReservation(std::size_t memory_used_delta,
bool increase) = 0;
virtual Status UpdateBlobCacheReservation(
std::shared_ptr<Cache> blob_cache) = 0;
virtual Status MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
@@ -114,7 +116,7 @@ class CacheReservationManagerImpl
// (and new_memory_used < cache_allocated_size_ * 3/4
// when delayed_decrease is set true);
//
// Keey dummy entries the same if (1) new_memory_used == cache_allocated_size_
// Keep dummy entries the same if (1) new_memory_used == cache_allocated_size_
// or (2) new_memory_used is in the interval of
// [cache_allocated_size_ * 3/4, cache_allocated_size) when delayed_decrease
// is set true.
@@ -139,6 +141,23 @@ class CacheReservationManagerImpl
return Status::NotSupported();
}

// Update the "blob cache" dummy entry in the block cache to match the memory
// usage of the blob cache against the global memory limit.
//
// Skip the dummy entry if blob cache and block cache are same
// backing cache.
//
// Keep the dummy entry the same if blob cache usage didn't
// change;
//
// Otherwise update this dummy entry.
//
// @param blob_cache The blob cache used by BlobDB
//
// @return It returns Status::OK() if the dummy entry updates succeed.
// Otherwise, it returns the non-ok status;
Status UpdateBlobCacheReservation(std::shared_ptr<Cache> blob_cache) override;

// One of the two ways of reserving cache space and releasing is done through
// destruction of CacheReservationHandle.
// See UpdateCacheReservation() for the other way.
@@ -213,10 +232,13 @@ class CacheReservationManagerImpl

std::shared_ptr<Cache> cache_;
bool delayed_decrease_;
std::atomic<std::size_t> cache_allocated_size_;
std::size_t memory_used_;
std::atomic<std::size_t> cache_allocated_size_; // no blob cache usage
std::size_t memory_used_; // no blob cache usage
std::vector<Cache::Handle *> dummy_handles_;
CacheKey cache_key_;

std::atomic<std::size_t> blob_cache_size_;
Cache::Handle *dummy_blobs_handle_;
};

class ConcurrentCacheReservationManager
@@ -282,6 +304,12 @@ class ConcurrentCacheReservationManager
return s;
}

inline Status UpdateBlobCacheReservation(
std::shared_ptr<Cache> blob_cache) override {
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
return cache_res_mgr_->UpdateBlobCacheReservation(blob_cache);
}

inline Status MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
3 changes: 2 additions & 1 deletion db/blob/blob_source.cc
Original file line number Diff line number Diff line change
@@ -21,7 +21,8 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options,
db_session_id_(db_session_id),
statistics_(immutable_options->statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache) {}
blob_cache_(immutable_options->blob_cache),
write_buffer_manager_(immutable_options->write_buffer_manager) {}

BlobSource::~BlobSource() = default;

41 changes: 38 additions & 3 deletions db/blob/blob_source.h
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
#include "db/blob/blob_file_cache.h"
#include "include/rocksdb/cache.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/block_based/cachable_entry.h"

namespace ROCKSDB_NAMESPACE {
@@ -68,9 +69,38 @@ class BlobSource {
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
return blob_cache_->Insert(key, value, charge,
&DeleteCacheEntry<std::string>, cache_handle,
priority);
const Status s =
blob_cache_->Insert(key, value, charge, &DeleteCacheEntry<std::string>,
cache_handle, priority);
if (s.ok()) {
// The current blob cache is LRU or clock cache, and every insertion,
// whether induced by direct insertion or indirect eviction, will effect
// cache usage. So, following that, we try to report the most recent blob
// cache size.
ChargeCacheUsage();
}
return s;
}

// Report the memory usage of the blob cache against the global memory limit.
//
// To help service owners to manage their memory budget effectively, Write
// Buffer Manager has been working towards counting all major memory users
// inside RocksDB towards a single global memory limit. This global limit is
// specified by the capacity of the block-based table's block cache, and is
// technically implemented by inserting dummy entries ("reservations") into
// the block cache. Thus, the blob cache has one and only one dummy entry in
// the block cache, and its entry size is equal to the current blob cache
// memory usage.
//
// ChargeCacheUsage() is to update the size of the dummy entry in the block
// cache when a new blob is inserted into the blob cache or an old blob is
// evictioned.
inline void ChargeCacheUsage() const {
if (write_buffer_manager_ != nullptr &&
write_buffer_manager_->cost_to_cache()) {
write_buffer_manager_->ReserveMemWithBlobCache(blob_cache_);
}
}

const std::string db_id_;
@@ -83,6 +113,11 @@ class BlobSource {

// A cache to store uncompressed blobs.
std::shared_ptr<Cache> blob_cache_;

// Write buffer manager helps users control the total memory used by memtables
// across multiple column families and/or DB instances.
// https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager
std::shared_ptr<WriteBufferManager> write_buffer_manager_;
};

} // namespace ROCKSDB_NAMESPACE
4 changes: 4 additions & 0 deletions include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
@@ -152,6 +152,10 @@ class WriteBufferManager final {

void RemoveDBFromQueue(StallInterface* wbm_stall);

// Charge the memory usage of the blob cache when the backing cache of the
// blob cache and the block cache are different.
void ReserveMemWithBlobCache(std::shared_ptr<Cache> blob_cache);

private:
std::atomic<size_t> buffer_size_;
std::atomic<size_t> mutable_limit_;
12 changes: 12 additions & 0 deletions memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
@@ -199,4 +199,16 @@ void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
wbm_stall->Signal();
}

void WriteBufferManager::ReserveMemWithBlobCache(
std::shared_ptr<Cache> blob_cache) {
#ifndef ROCKSDB_LITE
assert(cache_res_mgr_ != nullptr);
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
Status s = cache_res_mgr_->UpdateBlobCacheReservation(blob_cache);
s.PermitUncheckedError();
#else
(void)blob_cache;
#endif // ROCKSDB_LITE
}

} // namespace ROCKSDB_NAMESPACE