Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jan 17, 2024
1 parent 78fd763 commit 0ff5efd
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 22 deletions.
10 changes: 6 additions & 4 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ Status CachedRemoteFileReader::close() {
return _remote_file_reader->close();
}

std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset,
size_t read_size, size_t length) {
std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, size_t read_size,
size_t length) {
size_t left = offset;
size_t right = offset + read_size - 1;
size_t align_left = (left / config::file_cache_each_block_size) * config::file_cache_each_block_size;
size_t align_right = (right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size;
size_t align_left =
(left / config::file_cache_each_block_size) * config::file_cache_each_block_size;
size_t align_right =
(right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size;
align_right = align_right < length ? align_right : length;
size_t align_size = align_right - align_left;
if (align_size < config::file_cache_each_block_size && align_left != 0) {
Expand Down
1 change: 0 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class CachedRemoteFileReader final : public FileReader {
const IOContext* io_ctx) override;

private:

bool _is_doris_table;
FileReaderSPtr _remote_file_reader;
UInt128Wrapper _cache_hash;
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FileCacheStorage {
virtual Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) = 0;
// use when lazy load cache
virtual void load_blocks_directly_unlocked(BlockFileCacheManager* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) {}
std::lock_guard<std::mutex>& cache_lock) {}
};

} // namespace doris::io
8 changes: 7 additions & 1 deletion be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_reader.h"
#include "io/fs/local_file_writer.h"
#include "runtime/exec_env.h"
#include "vec/common/hex.h"

namespace doris::io {

FDCache* FDCache::instance() {
return ExecEnv::GetInstance()->file_cache_open_fd_cache();
}

std::shared_ptr<FileReader> FDCache::get_file_reader(const AccessKeyAndOffset& key) {
if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
return nullptr;
Expand Down Expand Up @@ -397,7 +402,8 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCacheManager* _mgr
LOG(WARNING) << "parse offset err, path=" << offset_it->path().native();
continue;
}
TEST_SYNC_POINT_CALLBACK("BlockFileCacheManager::REMOVE_FILE_2", &offset_with_suffix);
TEST_SYNC_POINT_CALLBACK("BlockFileCacheManager::REMOVE_FILE_2",
&offset_with_suffix);
size_t size = offset_it->file_size(ec);
if (ec) {
LOG(WARNING) << "failed to file_size: file_name=" << offset_with_suffix
Expand Down
5 changes: 1 addition & 4 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ namespace doris::io {

class FDCache {
public:
static FDCache* instance() {
static FDCache fd_cache;
return &fd_cache;
}
static FDCache* instance();

std::shared_ptr<FileReader> get_file_reader(const AccessKeyAndOffset& key);

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class FileSystem;
struct FileWriterOptions {
bool write_file_cache = false;
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system
bool sync_file_data = true; // Whether flush data into storage system
uint64_t file_cache_expiration = 0; // Absolute time
};

Expand Down
5 changes: 2 additions & 3 deletions be/src/io/fs/s3_file_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
#include "common/logging.h"
#include "common/status.h"
#include "common/sync_point.h"
#include "io/cache/file_cache_utils.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_utils.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "util/defer_op.h"
Expand Down Expand Up @@ -92,8 +92,7 @@ void UploadFileBuffer::set_index_offset(size_t offset) {
_index_offset = offset;
if (_holder) {
bool change_to_index_cache = false;
for (auto iter = _holder->file_blocks.begin(); iter != _holder->file_blocks.end();
++iter) {
for (auto iter = _holder->file_blocks.begin(); iter != _holder->file_blocks.end(); ++iter) {
if (iter == _cur_file_block) {
change_to_index_cache = true;
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@

#include "common/config.h"
#include "common/status.h"
#include "io/cache/block_file_cache_manager.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/block_file_cache_manager.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_utils.h"
#include "io/fs/err_utils.h"
Expand Down Expand Up @@ -270,7 +270,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
[cache = _cache, k = _cache_hash, offset = _bytes_appended,
t = _expiration_time, cold = _is_cold_data]() -> FileBlocksHolderPtr {
CacheContext ctx;
ctx.cache_type = t == 0 ? FileCacheType::NORMAL : FileCacheType::TTL;
ctx.cache_type =
t == 0 ? FileCacheType::NORMAL : FileCacheType::TTL;
ctx.expiration_time = t;
ctx.is_cold_data = cold;
auto holder = cache->get_or_set(k, offset,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
}

io::FileCacheSettings CachePath::init_settings() const {
return io::calc_settings(total_bytes, query_limit_bytes, normal_percent, disposable_percent, index_percent);
return io::calc_settings(total_bytes, query_limit_bytes, normal_percent, disposable_percent,
index_percent);
}

} // end namespace doris
5 changes: 5 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <vector>

#include "common/status.h"
#include "io/cache/fs_file_cache_storage.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
Expand All @@ -53,6 +54,7 @@ class TaskGroupManager;
}
namespace io {
class FileCacheFactory;
class FDCache;
} // namespace io
namespace segment_v2 {
class InvertedIndexSearcherCache;
Expand Down Expand Up @@ -273,6 +275,8 @@ class ExecEnv {
return _runtime_filter_timer_queue;
}

io::FDCache* file_cache_open_fd_cache() const { return _file_cache_open_fd_cache.get(); }

private:
ExecEnv();

Expand Down Expand Up @@ -374,6 +378,7 @@ class ExecEnv {
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;

// used for query with group cpu hard limit
std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _global_block_scheduler;
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/block_file_cache_manager.h"
#include "io/fs/file_meta_cache.h"
Expand Down Expand Up @@ -225,6 +226,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);

_backend_client_cache->init_metrics("backend");
Expand Down Expand Up @@ -337,7 +339,7 @@ void ExecEnv::init_file_cache_factory() {
cache_path_set.emplace(cache_path.path);
}

for (std::thread& thread : file_cache_init_threads) {
for (std::thread& thread : file_cache_init_threads) {
if (thread.joinable()) {
thread.join();
}
Expand Down Expand Up @@ -446,7 +448,7 @@ Status ExecEnv::_init_mem_env() {
<< ", config file_cache_max_file_reader_cache_size is: "
<< config::file_cache_max_file_reader_cache_size;
config::file_cache_max_file_reader_cache_size = block_file_cache_fd_cache_size;

_file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num);

_lookup_connection_cache = LookupConnectionCache::create_global_instance(
Expand Down
4 changes: 2 additions & 2 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// KIND, either express or implied. See the License for the∂∂∂
// specific language governing permissions and limitations
// under the License.
// This file is copied from
Expand Down Expand Up @@ -3598,7 +3598,7 @@ TEST_F(BlockFileCacheTest, test_align_size) {
}

TEST_F(BlockFileCacheTest, remove_if_cached_when_isnt_releasable) {
if (fs::exists(cache_base_path)) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);
Expand Down

0 comments on commit 0ff5efd

Please sign in to comment.