Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jan 17, 2024
1 parent bd45647 commit 9e21eef
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 8 deletions.
6 changes: 2 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,7 @@ DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
DEFINE_String(file_cache_path, "");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB
DEFINE_Validator(file_cache_each_block_size, [](const int64_t config) -> bool {
return config <= config::s3_write_buffer_size && config::s3_write_buffer_size % config == 0;
});

DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
Expand Down Expand Up @@ -1035,7 +1033,7 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
// Page size of row column, default 4KB
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
DEFINE_mInt64(s3_write_buffer_size, "5242880");
// The timeout config for S3 buffer allocation
DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ DECLARE_mInt32(tablet_path_check_batch_size);
// Page size of row column, default 4KB
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
DECLARE_mInt64(s3_write_buffer_size);
// The timeout config for S3 buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
// the max number of cached file handle for block segemnt
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <bthread/bthread.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <bvar/bvar.h>
#include <errno.h> // IWYU pragma: keep
#include <fmt/format.h>
#include <glog/logging.h>
Expand All @@ -40,11 +41,14 @@ namespace doris {
namespace io {
struct IOContext;

bvar::Adder<uint64_t> local_file_open_reading("doris_be_local_file_open_reading");
bvar::Adder<uint64_t> local_file_reader_total("doris_be_local_file_reader_total");

LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd,
std::shared_ptr<LocalFileSystem> fs)
: _fd(fd), _path(std::move(path)), _file_size(file_size), _fs(std::move(fs)) {
DorisMetrics::instance()->local_file_open_reading->increment(1);
DorisMetrics::instance()->local_file_reader_total->increment(1);
local_file_open_reading << 1;
local_file_reader_total << 1;
}

LocalFileReader::~LocalFileReader() {
Expand All @@ -54,7 +58,7 @@ LocalFileReader::~LocalFileReader() {
Status LocalFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
DorisMetrics::instance()->local_file_open_reading->increment(-1);
local_file_open_reading << -1;
DCHECK(bthread_self() == 0);
if (-1 == ::close(_fd)) {
std::string err = errno_to_str();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "s3_file_bufferpool.h"

#include <bvar/bvar.h>

#include <chrono>
#include <memory>

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class RowCache;
class DummyLRUCache;
class CacheManager;
class WalManager;
class DorisMetrics;

inline bool k_doris_exit = false;

Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ Status ExecEnv::init_pipeline_task_scheduler() {
void ExecEnv::init_file_cache_factory() {
// Load file cache before starting up daemon threads to make sure StorageEngine is read.
if (doris::config::enable_file_cache) {
if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
config::s3_write_buffer_size % config::file_cache_each_block_size != 0) {
LOG_FATAL(
"The config file_cache_each_block_size {} must less than or equal to config "
"s3_write_buffer_size {} and config::s3_write_buffer_size % "
"config::file_cache_each_block_size must be zero",
config::file_cache_each_block_size, config::s3_write_buffer_size);
exit(-1);
}
std::unordered_set<std::string> cache_path_set;
std::vector<doris::CachePath> cache_paths;
Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
Expand Down Expand Up @@ -617,6 +626,7 @@ void ExecEnv::destroy() {
_buffered_reader_prefetch_thread_pool.reset(nullptr);
_s3_file_upload_thread_pool.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
_file_cache_open_fd_cache.reset(nullptr);

SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
Expand Down
1 change: 1 addition & 0 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class BlockFileCacheTest : public testing::Test {
ASSERT_TRUE(writer->close().ok());
}
ExecEnv::GetInstance()->_file_cache_factory = factory.get();
ExecEnv::GetInstance()->_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
}
static void TearDownTestSuite() {
config::file_cache_enter_disk_resource_limit_mode_percent = 90;
Expand Down

0 comments on commit 9e21eef

Please sign in to comment.