Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jun 19, 2024
1 parent 1bb4f14 commit 2273279
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 45 deletions.
26 changes: 26 additions & 0 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,32 @@
#include "runtime/exec_env.h"

namespace doris {
template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool _use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(), _size(b), _capacity(b), _page_type(page_type) {
if (_use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(_page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
} else {
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
}

template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
if (_use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(_page_type));
TAllocator::free(_data, _capacity);
} else {
TAllocator::free(_data, _capacity);
}
}
}

StoragePageCache* StoragePageCache::create_global_cache(size_t capacity,
int32_t index_cache_percentage,
int64_t pk_index_cache_capacity,
Expand Down
20 changes: 4 additions & 16 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,10 @@ template <typename TAllocator>
class PageBase : private TAllocator, public LRUCacheValueBase {
public:
PageBase() = default;

PageBase(size_t b, const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: LRUCacheValueBase(), _size(b), _capacity(b), _mem_tracker_by_allocator(mem_tracker) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}

PageBase(size_t b, bool _use_cache, segment_v2::PageTypePB page_type);
PageBase(const PageBase&) = delete;
PageBase& operator=(const PageBase&) = delete;

~PageBase() override {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
TAllocator::free(_data, _capacity);
}
}
~PageBase() override;

char* data() { return _data; }
size_t size() { return _size; }
Expand All @@ -73,7 +60,8 @@ class PageBase : private TAllocator, public LRUCacheValueBase {
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
bool _use_cache;
segment_v2::PageTypePB _page_type;
};

using DataPage = PageBase<Allocator<false>>;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder {
* @return Status
*/
Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t size_of_tail,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) override {
bool _use_cache, segment_v2::PageTypePB page_type) override {
size_t num_elements, compressed_size, num_element_after_padding;
int size_of_element;

Expand Down Expand Up @@ -67,7 +67,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder {
decoded_slice.size = size_of_dict_header + BITSHUFFLE_PAGE_HEADER_SIZE +
num_element_after_padding * size_of_element + size_of_tail;
std::unique_ptr<DataPage> decoded_page =
std::make_unique<DataPage>(decoded_slice.size, mem_tracker);
std::make_unique<DataPage>(decoded_slice.size, _use_cache, page_type);
decoded_slice.data = decoded_page->data();

if constexpr (USED_IN_DICT_ENCODING) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/encoding_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum EncodingTypePB : int;
class DataPagePreDecoder {
public:
virtual Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t size_of_tail,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) = 0;
bool _use_cache, segment_v2::PageTypePB page_type) = 0;
virtual ~DataPagePreDecoder() = default;
};

Expand Down
14 changes: 4 additions & 10 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,9 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
opts.file_reader->path().native());
}

std::shared_ptr<MemTrackerLimiter> page_mem_tracker;
if (opts.use_page_cache && cache) {
page_mem_tracker = cache->mem_tracker(opts.type);
} else {
page_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}

// hold compressed page at first, reset to decompressed page later
std::unique_ptr<DataPage> page = std::make_unique<DataPage>(page_size, page_mem_tracker);
std::unique_ptr<DataPage> page =
std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
Slice page_slice(page->data(), page_size);
{
SCOPED_RAW_TIMER(&opts.stats->io_ns);
Expand Down Expand Up @@ -190,7 +184,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
}
SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
footer->uncompressed_size() + footer_size + 4, page_mem_tracker);
footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);

// decompress page body
Slice compressed_body(page_slice.data, body_size);
Expand Down Expand Up @@ -218,7 +212,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
if (pre_decoder) {
RETURN_IF_ERROR(pre_decoder->decode(
&page, &page_slice, footer->data_page_footer().nullmap_size() + footer_size + 4,
page_mem_tracker));
opts.use_page_cache, opts.type));
}
}

Expand Down
32 changes: 16 additions & 16 deletions be/src/runtime/memory/lru_cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class LRUCachePolicyTrackingAllocator : public LRUCachePolicy {
bool enable_prune = true)
: LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards,
element_count_capacity, enable_prune) {
init_mem_tracker(lru_cache_type_string(lru_cache_type));
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}

LRUCachePolicyTrackingAllocator(CacheType type, size_t capacity, LRUCacheType lru_cache_type,
Expand All @@ -222,17 +222,11 @@ class LRUCachePolicyTrackingAllocator : public LRUCachePolicy {
: LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards,
element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, enable_prune) {
init_mem_tracker(lru_cache_type_string(lru_cache_type));
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}

~LRUCachePolicyTrackingAllocator() override { reset_cache(); }

void init_mem_tracker(const std::string& type_name) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::GLOBAL,
fmt::format("{}[{}](AllocByAllocator)", type_string(_type), type_name));
}

std::shared_ptr<MemTrackerLimiter> mem_tracker() const {
DCHECK(_mem_tracker != nullptr);
return _mem_tracker;
Expand All @@ -249,6 +243,12 @@ class LRUCachePolicyTrackingAllocator : public LRUCachePolicy {
}

protected:
void _init_mem_tracker(const std::string& type_name) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::GLOBAL,
fmt::format("{}[{}](AllocByAllocator)", type_string(_type), type_name));
}

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

Expand All @@ -261,7 +261,7 @@ class LRUCachePolicyTrackingManual : public LRUCachePolicy {
bool enable_prune = true)
: LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards,
element_count_capacity, enable_prune) {
init_mem_tracker(lru_cache_type_string(lru_cache_type));
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}

LRUCachePolicyTrackingManual(CacheType type, size_t capacity, LRUCacheType lru_cache_type,
Expand All @@ -272,17 +272,11 @@ class LRUCachePolicyTrackingManual : public LRUCachePolicy {
: LRUCachePolicy(type, capacity, lru_cache_type, stale_sweep_time_s, num_shards,
element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, enable_prune) {
init_mem_tracker(lru_cache_type_string(lru_cache_type));
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}

~LRUCachePolicyTrackingManual() override { reset_cache(); }

void init_mem_tracker(const std::string& type_name) {
_mem_tracker =
std::make_unique<MemTracker>(fmt::format("{}[{}]", type_string(_type), type_name),
ExecEnv::GetInstance()->details_mem_tracker_set());
}

MemTracker* mem_tracker() {
DCHECK(_mem_tracker != nullptr);
return _mem_tracker.get();
Expand All @@ -306,6 +300,12 @@ class LRUCachePolicyTrackingManual : public LRUCachePolicy {
}

private:
void _init_mem_tracker(const std::string& type_name) {
_mem_tracker =
std::make_unique<MemTracker>(fmt::format("{}[{}]", type_string(_type), type_name),
ExecEnv::GetInstance()->details_mem_tracker_set());
}

// LRUCacheType::SIZE equal to total_size.
size_t _get_bytes_with_handle(const CacheKey& key, size_t charge, size_t bytes) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
Expand Down

0 comments on commit 2273279

Please sign in to comment.