Skip to content

Commit

Permalink
[enhancement](cloud) optimize block cache lock (apache#41818)
Browse files Browse the repository at this point in the history
pick apache#41818 from master

1. async deletion when do stale rowsets reclycle
2. minimize lock critical size
3. add cache lock held & wait time info for debug
  • Loading branch information
freemandealer committed Nov 7, 2024
1 parent e682fa2 commit 6fdb52c
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 42 deletions.
4 changes: 4 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ if (ENABLE_INJECTION_POINT)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DENABLE_INJECTION_POINT")
endif()

if (ENABLE_CACHE_LOCK_DEBUG)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DENABLE_CACHE_LOCK_DEBUG")
endif()

# Enable memory tracker, which allows BE to limit the memory of tasks such as query, load,
# and compaction,and observe the memory of BE through be_ip:http_port/MemTracker.
# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker,
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset
// TODO: Segment::file_cache_key
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached(file_key);
file_cache->remove_if_cached_async(file_key);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,9 @@ DEFINE_mBool(enable_query_like_bloom_filter, "true");
DEFINE_Int32(doris_remote_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");
DEFINE_mInt64(block_cache_wait_timeout_ms, "1000");
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");

// limit the queue of pending batches which will be sent by a single nodechannel
DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,9 @@ DECLARE_mInt64(nodechannel_pending_queue_max_bytes);

// The batch size for sending data by brpc streaming client
DECLARE_mInt64(brpc_streaming_client_batch_bytes);
DECLARE_mInt64(block_cache_wait_timeout_ms);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);

DECLARE_Bool(enable_brpc_builtin_services);

Expand Down
106 changes: 81 additions & 25 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
_ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
std::numeric_limits<int>::max());

_recycle_keys = std::make_shared<boost::lockfree::spsc_queue<FileCacheKey>>(
config::file_cache_recycle_keys_size);
if (cache_settings.storage == "memory") {
_storage = std::make_unique<MemFileCacheStorage>();
_cache_base_path = "memory";
Expand Down Expand Up @@ -251,8 +253,7 @@ FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) {

BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
const TUniqueId& query_id) {
std::lock_guard cache_lock(_mutex);

SCOPED_CACHE_LOCK(_mutex);
if (!config::enable_file_cache_query_limit) {
return {};
}
Expand All @@ -270,7 +271,7 @@ BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
}

void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
const auto& query_iter = _query_map.find(query_id);

if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
Expand Down Expand Up @@ -315,7 +316,7 @@ void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash,
}

Status BlockFileCache::initialize() {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
return initialize_unlocked(cache_lock);
}

Expand Down Expand Up @@ -522,7 +523,7 @@ std::string BlockFileCache::clear_file_cache_async() {
int64_t num_cells_to_delete = 0;
int64_t num_files_all = 0;
{
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
if (!_async_clear_file_cache) {
for (auto& [_, offset_to_cell] : _files) {
++num_files_all;
Expand Down Expand Up @@ -758,7 +759,7 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o
CacheContext& context) {
FileBlock::Range range(offset, offset + size - 1);

std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
if (auto iter = _key_to_time.find(hash);
context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) {
context.cache_type = FileCacheType::TTL;
Expand Down Expand Up @@ -834,7 +835,7 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha
}

size_t BlockFileCache::try_release() {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
std::vector<FileBlockCell*> trash;
for (auto& [hash, blocks] : _files) {
for (auto& [offset, cell] : blocks) {
Expand Down Expand Up @@ -899,6 +900,18 @@ void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}

void BlockFileCache::remove_file_blocks_async(std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, /*sync*/ false);
}
};
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}

void BlockFileCache::remove_file_blocks_and_clean_time_maps(
std::vector<FileBlockCell*>& to_evict, std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) {
Expand Down Expand Up @@ -1184,7 +1197,7 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b
}

void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
if (!is_ttl_file) {
auto iter = _files.find(file_key);
Expand All @@ -1200,6 +1213,23 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
}
}

void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
SCOPED_CACHE_LOCK(_mutex);
bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
if (!is_ttl_file) {
auto iter = _files.find(file_key);
std::vector<FileBlockCell*> to_remove;
if (iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
if (cell.releasable()) {
to_remove.push_back(&cell);
}
}
}
remove_file_blocks_async(to_remove, cache_lock);
}
}

std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
FileCacheType cur_cache_type) {
switch (cur_cache_type) {
Expand Down Expand Up @@ -1383,7 +1413,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,

template <class T, class U>
requires IsXLock<T> && IsXLock<U>
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock) {
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
auto hash = file_block->get_hash_value();
auto offset = file_block->offset();
auto type = file_block->cache_type();
Expand All @@ -1403,9 +1433,24 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
key.offset = offset;
key.meta.type = type;
key.meta.expiration_time = expiration_time;
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
if (sync) {
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
} else {
// the file will be deleted in the bottom half
// so there will be a window that the file is not in the cache but still in the storage
// but it's ok, because the rowset is stale already
// in case something unexpected happen, set the _recycle_keys queue to zero to fallback
bool ret = _recycle_keys->push(key);
if (!ret) {
LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
}
}
_cur_cache_size -= file_block->range().size();
Expand All @@ -1420,8 +1465,18 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
*_num_removed_blocks << 1;
}

void BlockFileCache::recycle_stale_rowset_async_bottom_half() {
FileCacheKey key;
while (_recycle_keys->pop(key)) {
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
}

size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
return get_used_cache_size_unlocked(cache_type, cache_lock);
}

Expand All @@ -1431,7 +1486,7 @@ size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
}

size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
return get_available_cache_size_unlocked(cache_type, cache_lock);
}

Expand All @@ -1442,7 +1497,7 @@ size_t BlockFileCache::get_available_cache_size_unlocked(
}

size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
return get_file_blocks_num_unlocked(cache_type, cache_lock);
}

Expand Down Expand Up @@ -1526,7 +1581,7 @@ std::string BlockFileCache::LRUQueue::to_string(
}

std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
return dump_structure_unlocked(hash, cache_lock);
}

Expand All @@ -1544,7 +1599,7 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
}

std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
return dump_single_cache_type_unlocked(hash, offset, cache_lock);
}

Expand Down Expand Up @@ -1607,7 +1662,7 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) {
ss << "finish reset_capacity, path=" << _cache_base_path;
auto start_time = steady_clock::time_point();
{
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
int64_t need_remove_size = _cur_cache_size - new_capacity;
auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
Expand Down Expand Up @@ -1719,10 +1774,11 @@ void BlockFileCache::run_background_operation() {
break;
}
}
recycle_stale_rowset_async_bottom_half();
recycle_deleted_blocks();
// gc
int64_t cur_time = UnixSeconds();
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
while (!_time_to_key.empty()) {
auto begin = _time_to_key.begin();
if (cur_time < begin->first) {
Expand Down Expand Up @@ -1768,7 +1824,7 @@ void BlockFileCache::run_background_operation() {

void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
// 1. If new_expiration_time is equal to zero
if (new_expiration_time == 0) {
remove_if_ttl_file_unlock(hash, false, cache_lock);
Expand Down Expand Up @@ -1828,7 +1884,7 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& pair : _files.find(hash)->second) {
Expand Down Expand Up @@ -1897,7 +1953,7 @@ std::string BlockFileCache::clear_file_cache_directly() {
using namespace std::chrono;
std::stringstream ss;
auto start = steady_clock::now();
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);

std::string clear_msg;
Expand Down Expand Up @@ -1935,7 +1991,7 @@ std::string BlockFileCache::clear_file_cache_directly() {

std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
std::map<size_t, FileBlockSPtr> offset_to_block;
std::lock_guard cache_lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
if (_files.contains(hash)) {
for (auto& [offset, cell] : _files[hash]) {
if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
Expand All @@ -1950,7 +2006,7 @@ std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128W
}

void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
std::lock_guard lock(_mutex);
SCOPED_CACHE_LOCK(_mutex);
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
cell.update_atime();
Expand Down Expand Up @@ -2024,5 +2080,5 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() {

template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock);
std::lock_guard<std::mutex>& block_lock, bool sync);
} // namespace doris::io
Loading

0 comments on commit 6fdb52c

Please sign in to comment.