Skip to content

Commit

Permalink
(cloud-merge) Supports online capacity expansion and contraction
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 25, 2024
1 parent f6986e1 commit 30b8959
Show file tree
Hide file tree
Showing 35 changed files with 598 additions and 174 deletions.
40 changes: 0 additions & 40 deletions be/src/http/action/clear_file_cache_action.cpp

This file was deleted.

32 changes: 0 additions & 32 deletions be/src/http/action/clear_file_cache_action.h

This file was deleted.

54 changes: 44 additions & 10 deletions be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,59 @@

namespace doris {

const static std::string HEADER_JSON = "application/json";
const static std::string OP = "op";
constexpr static std::string_view HEADER_JSON = "application/json";
constexpr static std::string_view OP = "op";
constexpr static std::string_view SYNC = "sync";
constexpr static std::string_view PATH = "path";
constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
constexpr static std::string_view RELEASED_ELEMENTS = "released_elements";

Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
std::string operation = req->param(OP);
if (operation == "release") {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
std::string operation = req->param(OP.data());
Status st = Status::OK();
if (operation == RELEASE) {
size_t released = 0;
if (req->param("base_path") != "") {
released = io::FileCacheFactory::instance()->try_release(req->param("base_path"));
const std::string& base_path = req->param(BASE_PATH.data());
if (!base_path.empty()) {
released = io::FileCacheFactory::instance()->try_release(base_path);
} else {
released = io::FileCacheFactory::instance()->try_release();
}
EasyJson json;
json["released_elements"] = released;
json[RELEASED_ELEMENTS.data()] = released;
*json_metrics = json.ToString();
return Status::OK();
} else if (operation == CLEAR) {
const std::string& sync = req->param(SYNC.data());
auto ret = io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
} else if (operation == RESET) {
Status st;
std::string capacity = req->param(CAPACITY.data());
int64_t new_capacity = 0;
bool parse = true;
try {
new_capacity = std::stoll(capacity);
} catch (...) {
parse = false;
}
if (!parse || new_capacity <= 0) {
st = Status::InvalidArgument(
"The capacity {} failed to be parsed, the capacity needs to be in "
"the interval (0, INT64_MAX]",
capacity);
} else {
const std::string& path = req->param(PATH.data());
auto ret = io::FileCacheFactory::instance()->reset_capacity(path, new_capacity);
LOG(INFO) << ret;
}
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
return Status::InternalError("invalid operation: {}", operation);
return st;
}

void FileCacheAction::handle(HttpRequest* req) {
Expand Down
69 changes: 65 additions & 4 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1408,11 +1408,72 @@ int disk_used_percentage(const std::string& path, std::pair<int, int>* percent)
return 0;
}

void BlockFileCache::check_disk_resource_limit(const std::string& path) {
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
using namespace std::chrono;
int64_t space_released = 0;
size_t old_capacity = 0;
std::stringstream ss;
ss << "finish reset_capacity, path=" << _cache_base_path;
auto start_time = steady_clock::time_point();
{
std::lock_guard cache_lock(_mutex);
if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
int64_t need_remove_size = _cur_cache_size - new_capacity;
auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
int64_t queue_released = 0;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (need_remove_size <= 0) return queue_released;
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
if (!cell->releasable()) continue;
cell->is_deleted = true;
need_remove_size -= entry_size;
space_released += entry_size;
queue_released += entry_size;
}
return queue_released;
};
int64_t queue_released = remove_blocks(_disposable_queue);
ss << " disposable_queue released " << queue_released;
queue_released = remove_blocks(_normal_queue);
ss << " normal_queue released " << queue_released;
queue_released = remove_blocks(_index_queue);
ss << " index_queue released " << queue_released;
if (need_remove_size >= 0) {
queue_released = 0;
for (auto& [_, key] : _time_to_key) {
for (auto& [_, cell] : _files[key]) {
if (need_remove_size <= 0) break;
cell.is_deleted = true;
need_remove_size -= cell.file_block->range().size();
space_released += cell.file_block->range().size();
queue_released += cell.file_block->range().size();
}
}
ss << " ttl_queue released " << queue_released;
}
_disk_resource_limit_mode = true;
_async_clear_file_cache = true;
ss << " total_space_released=" << space_released;
}
old_capacity = _capacity;
_capacity = new_capacity;
}
auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - start_time);
LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
<< " use_time=" << static_cast<int64_t>(use_time.count());
ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
LOG(INFO) << ss.str();
return ss.str();
}

void BlockFileCache::check_disk_resource_limit() {
if (_capacity > _cur_cache_size) {
_disk_resource_limit_mode = false;
}
std::pair<int, int> percent;
int ret = disk_used_percentage(path, &percent);
int ret = disk_used_percentage(_cache_base_path, &percent);
if (ret != 0) {
LOG_ERROR("").tag("file cache path", path).tag("error", strerror(errno));
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [capacity_percentage, inode_percentage] = percent;
Expand Down Expand Up @@ -1452,7 +1513,7 @@ void BlockFileCache::run_background_operation() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
check_disk_resource_limit(_cache_base_path);
check_disk_resource_limit();
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
Expand Down
10 changes: 9 additions & 1 deletion be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ class BlockFileCache {
*/
std::string clear_file_cache_async();
std::string clear_file_cache_directly();

/**
* Reset the cache capacity. If the new_capacity is smaller than _capacity, the redundant data will be remove async.
*
* @returns summary message
*/
std::string reset_capacity(size_t new_capacity);

std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
/// For debug.
std::string dump_structure(const UInt128Wrapper& hash);
Expand Down Expand Up @@ -358,7 +366,7 @@ class BlockFileCache {
size_t get_used_cache_size_unlocked(FileCacheType type,
std::lock_guard<std::mutex>& cache_lock) const;

void check_disk_resource_limit(const std::string& path);
void check_disk_resource_limit();

size_t get_available_cache_size_unlocked(FileCacheType type,
std::lock_guard<std::mutex>& cache_lock) const;
Expand Down
19 changes: 17 additions & 2 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
size_t disk_capacity = static_cast<size_t>(
static_cast<size_t>(stat.f_blocks) * static_cast<size_t>(stat.f_bsize) *
(static_cast<double>(config::file_cache_enter_disk_resource_limit_mode_percent) / 100));
if (disk_capacity < file_cache_settings.capacity) {
LOG_INFO("The cache {} config size {} is larger than {}% disk size {}, recalc it.",
if (file_cache_settings.capacity == 0 || disk_capacity < file_cache_settings.capacity) {
LOG_INFO("The cache {} config size {} is larger than {}% disk size {} or zero, recalc it.",
cache_base_path, file_cache_settings.capacity,
config::file_cache_enter_disk_resource_limit_mode_percent, disk_capacity);
file_cache_settings =
Expand Down Expand Up @@ -143,5 +143,20 @@ std::vector<std::string> FileCacheFactory::get_base_paths() {
return paths;
}

std::string FileCacheFactory::reset_capacity(const std::string& path, int64_t new_capacity) {
if (path.empty()) {
std::stringstream ss;
for (auto& [_, cache] : _path_to_cache) {
ss << cache->reset_capacity(new_capacity);
}
return ss.str();
} else {
if (auto iter = _path_to_cache.find(path); iter != _path_to_cache.end()) {
return iter->second->reset_capacity(new_capacity);
}
}
return "Unknown the cache path " + path;
}

} // namespace io
} // namespace doris
9 changes: 9 additions & 0 deletions be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ class FileCacheFactory {

std::vector<std::string> get_base_paths();

/**
* Clears data of all file cache instances
*
* @param path file cache absolute path
* @param new_capacity
* @return summary message
*/
std::string reset_capacity(const std::string& path, int64_t new_capacity);

FileCacheFactory() = default;
FileCacheFactory& operator=(const FileCacheFactory&) = delete;
FileCacheFactory(const FileCacheFactory&) = delete;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FileCacheSettings get_file_cache_settings(size_t capacity, size_t max_query_cach
size_t normal_percent, size_t disposable_percent,
size_t index_percent) {
io::FileCacheSettings settings;
if (capacity == 0) return settings;
settings.capacity = capacity;
settings.max_file_block_size = config::file_cache_each_block_size;
settings.max_query_cache_size = max_query_cache_size;
Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
if (value.IsInt64()) {
total_size = value.GetInt64();
} else {
return Status::InvalidArgument("total_size should be int64");
total_size = 0;
}
}
if (config::enable_file_cache_query_limit) {
Expand All @@ -230,13 +230,12 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
if (value.IsInt64()) {
query_limit_bytes = value.GetInt64();
} else {
return Status::InvalidArgument("query_limit should be int64");
query_limit_bytes = 0;
}
}
}
if (total_size <= 0 || (config::enable_file_cache_query_limit && query_limit_bytes <= 0)) {
return Status::InvalidArgument(
"total_size or query_limit should not less than or equal to zero");
if (total_size < 0 || (config::enable_file_cache_query_limit && query_limit_bytes < 0)) {
return Status::InvalidArgument("total_size or query_limit should not less than zero");
}

// percent
Expand Down
14 changes: 4 additions & 10 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "http/action/check_tablet_segment_action.h"
#include "http/action/checksum_action.h"
#include "http/action/clear_cache_action.h"
#include "http/action/clear_file_cache_action.h"
#include "http/action/compaction_action.h"
#include "http/action/config_action.h"
#include "http/action/debug_point_action.h"
Expand Down Expand Up @@ -208,9 +207,6 @@ Status HttpService::start() {
_pool.add(new MetaAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/meta/{op}/{tablet_id}", meta_action);

FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action);

ConfigAction* update_config_action =
_pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG));
_ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action);
Expand Down Expand Up @@ -303,9 +299,8 @@ void HttpService::register_local_handler(StorageEngine& engine) {
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_binlog/_download",
download_binlog_action);

ClearFileCacheAction* clear_file_cache_action = _pool.add(new ClearFileCacheAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/clear_file_cache",
clear_file_cache_action);
FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/file_cache", file_cache_action);

TabletsDistributionAction* tablets_distribution_action =
_pool.add(new TabletsDistributionAction(_env, engine, TPrivilegeHier::GLOBAL,
Expand Down Expand Up @@ -400,9 +395,8 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) {
_ev_http_server->register_handler(HttpMethod::GET, "/api/injection_point/{op}",
injection_point_action);
#endif
ClearFileCacheAction* clear_file_cache_action = _pool.add(new ClearFileCacheAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/clear_file_cache",
clear_file_cache_action);
FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action);
auto* show_hotspot_action = _pool.add(new ShowHotspotAction(engine));
_ev_http_server->register_handler(HttpMethod::GET, "/api/hotspot/tablet", show_hotspot_action);

Expand Down
Loading

0 comments on commit 30b8959

Please sign in to comment.