Skip to content

Commit

Permalink
[enhancement](memory) Add ThreadMemTrackerMgr BE UT (#35518)
Browse files Browse the repository at this point in the history
For thread context tracking memory
  • Loading branch information
xinyiZzz authored and dataroaring committed May 31, 2024
1 parent cc4a447 commit 7be7b37
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 10 deletions.
4 changes: 3 additions & 1 deletion be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ class ExecEnv {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();

static void set_tracking_memory(bool tracking_memory) {
_s_tracking_memory.store(tracking_memory, std::memory_order_acquire);
}
#endif
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }

Expand Down
8 changes: 6 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class ThreadMemTrackerMgr {
fmt::to_string(consumer_tracker_buf));
}

int64_t untracked_mem() const { return _untracked_mem; }
int64_t reserved_mem() const { return _reserved_mem; }

private:
// is false: ExecEnv::ready() = false when thread local is initialized
bool _init = false;
Expand Down Expand Up @@ -190,7 +193,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {

inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
if (_reserved_mem != 0) {
if (_reserved_mem >= size) {
if (_reserved_mem > size) {
// only need to subtract _reserved_mem, no need to consume MemTracker,
// every time _reserved_mem is minus the sum of size >= SYNC_PROC_RESERVED_INTERVAL_BYTES,
// subtract size from process global reserved memory,
Expand All @@ -208,7 +211,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
}
return;
} else {
// reserved memory is insufficient, the remaining _reserved_mem is subtracted from this memory consumed,
// _reserved_mem <= size, reserved memory used done,
// the remaining _reserved_mem is subtracted from this memory consumed,
// and reset _reserved_mem to 0, and subtract the remaining _reserved_mem from
// process global reserved memory, this means that all reserved memory has been used by BE process.
size -= _reserved_mem;
Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,12 @@ class ThreadContext {

void attach_task(const TUniqueId& task_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
thread_mem_tracker_mgr->set_query_id(_task_id);
Expand Down Expand Up @@ -380,9 +378,7 @@ class AttachTask {
class SwitchThreadMemTrackerLimiter {
public:
explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
DCHECK(mem_tracker);
#endif
ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
Expand All @@ -391,9 +387,7 @@ class SwitchThreadMemTrackerLimiter {
explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) {
ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() == query_thread_context.query_id);
#ifndef BE_TEST
DCHECK(query_thread_context.query_mem_tracker);
#endif
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TEST(MemTrackerTest, SingleTrackerNoLimit) {
t->release(5);
}

TEST(MemTestTest, SingleTrackerWithLimit) {
TEST(MemTrackerTest, SingleTrackerWithLimit) {
auto t = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "limit tracker",
11);
EXPECT_TRUE(t->has_limit());
Expand Down
Loading

0 comments on commit 7be7b37

Please sign in to comment.