Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](cache) File cache async init #39036

Merged
merged 9 commits into from
Aug 15, 2024
Merged
49 changes: 36 additions & 13 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,33 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
Status LRUFileCache::initialize() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard cache_lock(_mutex);
if (!_is_initialized) {
if (fs::exists(_cache_base_path)) {
RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock));
// the cache already exists, try to load cache info asyncly
_lazy_open_done = false;
_cache_background_load_thread = std::thread([this]() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard<std::mutex> cache_lock(_mutex);
Status s = load_cache_info_into_memory(cache_lock);
if (s.ok()) {
_lazy_open_done = true;
} else {
LOG(WARNING) << fmt::format("Failed to load cache info from {}: {}",
_cache_base_path, s.to_string());
}
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"FileCache lazy load done path={}, disposable queue size={} elements={}, "
"index queue size={} elements={}, query queue size={} elements={}, init "
"cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock), cost);
});
} else {
std::error_code ec;
fs::create_directories(_cache_base_path, ec);
Expand All @@ -136,17 +159,7 @@ Status LRUFileCache::initialize() {
_is_initialized = true;
_cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this);
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"After initialize file cache path={}, disposable queue size={} elements={}, index "
"queue size={} "
"elements={}, query queue "
"size={} elements={}, init cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock), cost);
LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}", cost);
return Status::OK();
}

Expand Down Expand Up @@ -376,6 +389,16 @@ void LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co

FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size,
const CacheContext& context) {
if (!_lazy_open_done) {
// Cache is not ready yet
LOG(WARNING) << std::format(
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
"Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.",
key.to_string(), offset, size);
FileBlocks file_blocks = {std::make_shared<FileBlock>(
offset, size, key, this, FileBlock::State::SKIP_CACHE, context.cache_type)};
return FileBlocksHolder(std::move(file_blocks));
}

FileBlock::Range range(offset, offset + size - 1);

std::lock_guard cache_lock(_mutex);
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/block/block_lru_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class LRUFileCache final : public IFileCache {
LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
~LRUFileCache() override {
_close = true;
if (_cache_background_load_thread.joinable()) {
_cache_background_thread.join();
}
if (_cache_background_thread.joinable()) {
_cache_background_thread.join();
}
Expand Down Expand Up @@ -201,6 +204,8 @@ class LRUFileCache final : public IFileCache {
private:
std::atomic_bool _close {false};
std::thread _cache_background_thread;
std::atomic_bool _lazy_open_done {true};
std::thread _cache_background_load_thread;
size_t _num_read_segments = 0;
size_t _num_hit_segments = 0;
size_t _num_removed_segments = 0;
Expand Down
Loading