diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 41d8c74032649e..d877096aec2b9c 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -263,7 +263,9 @@ class ExecEnv { this->_dummy_lru_cache = dummy_lru_cache; } void set_write_cooldown_meta_executors(); - + static void set_tracking_memory(bool tracking_memory) { + _s_tracking_memory.store(tracking_memory, std::memory_order_acquire); + } #endif LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 64c2190a149076..9d36cd2d807813 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -125,6 +125,9 @@ class ThreadMemTrackerMgr { fmt::to_string(consumer_tracker_buf)); } + int64_t untracked_mem() const { return _untracked_mem; } + int64_t reserved_mem() const { return _reserved_mem; } + private: // is false: ExecEnv::ready() = false when thread local is initialized bool _init = false; @@ -190,7 +193,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() { inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) { if (_reserved_mem != 0) { - if (_reserved_mem >= size) { + if (_reserved_mem > size) { // only need to subtract _reserved_mem, no need to consume MemTracker, // every time _reserved_mem is minus the sum of size >= SYNC_PROC_RESERVED_INTERVAL_BYTES, // subtract size from process global reserved memory, @@ -208,7 +211,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che } return; } else { - // reserved memory is insufficient, the remaining _reserved_mem is subtracted from this memory consumed, + // _reserved_mem <= size, reserved memory used done, + // the remaining _reserved_mem is subtracted from this memory consumed, // and reset _reserved_mem to 0, and subtract the remaining _reserved_mem from // process global reserved memory, this means that all reserved memory has been used by BE process. size -= _reserved_mem; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 72d3c8111f6c1c..7a4695a4e982ec 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -156,14 +156,12 @@ class ThreadContext { void attach_task(const TUniqueId& task_id, const std::shared_ptr& mem_tracker) { -#ifndef BE_TEST // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. DCHECK(mem_tracker); // Orphan is thread default tracker. DCHECK(thread_mem_tracker()->label() == "Orphan") << ", thread mem tracker label: " << thread_mem_tracker()->label() << ", attach mem tracker label: " << mem_tracker->label(); -#endif _task_id = task_id; thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); thread_mem_tracker_mgr->set_query_id(_task_id); @@ -374,9 +372,7 @@ class AttachTask { class SwitchThreadMemTrackerLimiter { public: explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr& mem_tracker) { -#ifndef BE_TEST DCHECK(mem_tracker); -#endif ThreadLocalHandle::create_thread_local_if_not_exits(); _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); @@ -385,9 +381,7 @@ class SwitchThreadMemTrackerLimiter { explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) { ThreadLocalHandle::create_thread_local_if_not_exits(); DCHECK(thread_context()->task_id() == query_thread_context.query_id); -#ifndef BE_TEST DCHECK(query_thread_context.query_mem_tracker); -#endif _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( query_thread_context.query_mem_tracker); diff --git a/be/test/runtime/mem_limit_test.cpp b/be/test/runtime/memory/mem_tracker_test.cpp similarity index 97% rename from be/test/runtime/mem_limit_test.cpp rename to be/test/runtime/memory/mem_tracker_test.cpp index e6630b1432d0e4..49f6aa3bf0cebe 100644 --- a/be/test/runtime/mem_limit_test.cpp +++ b/be/test/runtime/memory/mem_tracker_test.cpp @@ -38,7 +38,7 @@ TEST(MemTrackerTest, SingleTrackerNoLimit) { t->release(5); } -TEST(MemTestTest, SingleTrackerWithLimit) { +TEST(MemTrackerTest, SingleTrackerWithLimit) { auto t = std::make_unique(MemTrackerLimiter::Type::GLOBAL, "limit tracker", 11); EXPECT_TRUE(t->has_limit()); diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp new file mode 100644 index 00000000000000..29c2759fcb727f --- /dev/null +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/thread_mem_tracker_mgr.h" + +#include +#include + +#include "gtest/gtest_pred_impl.h" +#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/thread_context.h" + +namespace doris { + +TEST(ThreadMemTrackerMgrTest, ConsumeMemory) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ConsumeMemory"); + + int64_t size1 = 4 * 1024; + int64_t size2 = 4 * 1024 * 1024; + + thread_context->attach_task(TUniqueId(), t); + thread_context->consume_memory(size1); + // size1 < config::mem_tracker_consume_min_size_bytes, not consume mem tracker. + EXPECT_EQ(t->consumption(), 0); + + thread_context->consume_memory(size2); + // size1 + size2 > onfig::mem_tracker_consume_min_size_bytes, consume mem tracker. + EXPECT_EQ(t->consumption(), size1 + size2); + + thread_context->consume_memory(-size1); + // std::abs(-size1) < config::mem_tracker_consume_min_size_bytes, not consume mem tracker. + EXPECT_EQ(t->consumption(), size1 + size2); + + thread_context->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), size2); + + thread_context->consume_memory(-size2); + // std::abs(-size2) > onfig::mem_tracker_consume_min_size_bytes, consume mem tracker. + EXPECT_EQ(t->consumption(), 0); + + thread_context->consume_memory(-size2); + EXPECT_EQ(t->consumption(), -size2); + + thread_context->consume_memory(-size1); + EXPECT_EQ(t->consumption(), -size2); + + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size2 * 2); + thread_context->consume_memory(size2 * 10); + thread_context->consume_memory(size2 * 100); + thread_context->consume_memory(size2 * 1000); + thread_context->consume_memory(size2 * 10000); + thread_context->consume_memory(-size2 * 2); + thread_context->consume_memory(-size2 * 10); + thread_context->consume_memory(-size2 * 100); + thread_context->consume_memory(-size2 * 1000); + thread_context->consume_memory(-size2 * 10000); + thread_context->detach_task(); + EXPECT_EQ(t->consumption(), 0); // detach automatic call flush_untracked_mem. +} + +TEST(ThreadMemTrackerMgrTest, Boundary) { + // TODO, Boundary check may not be necessary, add some `IF` maybe increase cost time. +} + +TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t1 = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker1"); + std::shared_ptr t2 = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker2"); + std::shared_ptr t3 = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3"); + + int64_t size1 = 4 * 1024; + int64_t size2 = 4 * 1024 * 1024; + + thread_context->attach_task(TUniqueId(), t1); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + EXPECT_EQ(t1->consumption(), size1 + size2); + + thread_context->consume_memory(size1); + thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); + EXPECT_EQ(t1->consumption(), + size1 + size2 + size1); // attach automatic call flush_untracked_mem. + + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size1); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1); // not changed, now consume t2 + EXPECT_EQ(t2->consumption(), size1 + size2); + + thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // detach + EXPECT_EQ(t2->consumption(), + size1 + size2 + size1); // detach automatic call flush_untracked_mem. + + thread_context->consume_memory(size2); + thread_context->consume_memory(size2); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2); + EXPECT_EQ(t2->consumption(), size1 + size2 + size1); // not changed, now consume t1 + + thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); + thread_context->consume_memory(-size1); + thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size1); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2); + EXPECT_EQ(t2->consumption(), size1 + size2); // attach automatic call flush_untracked_mem. + EXPECT_EQ(t3->consumption(), size1 + size2); + + thread_context->consume_memory(-size1); + thread_context->consume_memory(-size2); + thread_context->consume_memory(-size1); + EXPECT_EQ(t3->consumption(), size1); + + thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); // detach + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2); + EXPECT_EQ(t2->consumption(), size1 + size2); + EXPECT_EQ(t3->consumption(), 0); + + thread_context->consume_memory(-size1); + thread_context->consume_memory(-size2); + thread_context->consume_memory(-size1); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2); + EXPECT_EQ(t2->consumption(), 0); + + thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // detach + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2); + EXPECT_EQ(t2->consumption(), -size1); + + thread_context->consume_memory(-t1->consumption()); + thread_context->detach_task(); // detach t1 + EXPECT_EQ(t1->consumption(), 0); +} + +TEST(ThreadMemTrackerMgrTest, MultiMemTracker) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t1 = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-MultiMemTracker1"); + std::shared_ptr t2 = std::make_shared("UT-MultiMemTracker2", t1.get()); + std::shared_ptr t3 = std::make_shared("UT-MultiMemTracker3", t1.get()); + + int64_t size1 = 4 * 1024; + int64_t size2 = 4 * 1024 * 1024; + + thread_context->attach_task(TUniqueId(), t1); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size1); + EXPECT_EQ(t1->consumption(), size1 + size2); + + bool rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get()); + EXPECT_EQ(rt, true); + EXPECT_EQ(t1->consumption(), size1 + size2); + EXPECT_EQ(t2->consumption(), -size1); // _untracked_mem = size1 + + thread_context->consume_memory(size2); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2); + EXPECT_EQ(t2->consumption(), size2); + + rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get()); + EXPECT_EQ(rt, false); + thread_context->consume_memory(size2); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2); + EXPECT_EQ(t2->consumption(), size2 + size2); + + rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t3.get()); + EXPECT_EQ(rt, true); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(-size1); // _untracked_mem = -size1 + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2); + EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2); + EXPECT_EQ(t3->consumption(), size1 + size2); + + thread_context->thread_mem_tracker_mgr->pop_consumer_tracker(); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1); + EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1); + EXPECT_EQ(t3->consumption(), size1 + size2 - size1); + + thread_context->consume_memory(-size2); + thread_context->consume_memory(size2); + thread_context->consume_memory(-size2); + thread_context->thread_mem_tracker_mgr->pop_consumer_tracker(); + EXPECT_EQ(t1->consumption(), + size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1 - size2); + EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - size2); + EXPECT_EQ(t3->consumption(), size1 + size2 - size1); + + thread_context->consume_memory(-t1->consumption()); + thread_context->detach_task(); // detach t1 + EXPECT_EQ(t1->consumption(), 0); + EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - size2); + EXPECT_EQ(t3->consumption(), size1 + size2 - size1); +} + +TEST(ThreadMemTrackerMgrTest, ScopedCount) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t1 = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ScopedCount"); + + int64_t size1 = 4 * 1024; + int64_t size2 = 4 * 1024 * 1024; + + thread_context->attach_task(TUniqueId(), t1); + thread_context->thread_mem_tracker_mgr->start_count_scope_mem(); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size1); + int64_t scope_mem = thread_context->thread_mem_tracker_mgr->stop_count_scope_mem(); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size1); + EXPECT_EQ(t1->consumption(), scope_mem); + + thread_context->consume_memory(-size2); + thread_context->consume_memory(-size1); + thread_context->consume_memory(-size2); + EXPECT_EQ(t1->consumption(), size1 + size1); + EXPECT_EQ(scope_mem, size1 + size2 + size1 + size2 + size1); +} + +TEST(ThreadMemTrackerMgrTest, ReserveMemory) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ReserveMemory"); + + int64_t size1 = 4 * 1024; + int64_t size2 = 4 * 1024 * 1024; + int64_t size3 = size2 * 1024; + + thread_context->attach_task(TUniqueId(), t); + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + EXPECT_EQ(t->consumption(), size1 + size2); + + thread_context->try_reserve_memory(size3); + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); + + thread_context->consume_memory(size2); + thread_context->consume_memory(-size2); + thread_context->consume_memory(size2); + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->consume_memory(-size1); + thread_context->consume_memory(-size1); + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + // std::abs(-size1 - size1) < SYNC_PROC_RESERVED_INTERVAL_BYTES, not update process_reserved_memory. + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->consume_memory(size2 * 1023); + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size1); + + std::cout << "11111 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " + << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; + thread_context->consume_memory(size1); + thread_context->consume_memory(size1); + std::cout << "2222 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " + << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; + std::cout << "3333 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " + << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; + // reserved memory used done + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); + + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + // no reserved memory, normal memory consumption + EXPECT_EQ(t->consumption(), size1 + size2 + size3 + size1 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); + + thread_context->consume_memory(-size3); + thread_context->consume_memory(-size1); + thread_context->consume_memory(-size2); + EXPECT_EQ(t->consumption(), size1 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); + + thread_context->try_reserve_memory(size3); + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); + + thread_context->consume_memory(-size1); + // ThreadMemTrackerMgr _reserved_mem = size3 + size1 + // ThreadMemTrackerMgr _untracked_mem = -size1 + thread_context->consume_memory(size3); + // ThreadMemTrackerMgr _reserved_mem = size1 + // ThreadMemTrackerMgr _untracked_mem = -size1 + size3 + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), + size1); // size3 + size1 - size3 + + thread_context->consume_memory(-size3); + // ThreadMemTrackerMgr _reserved_mem = size1 + size3 + // ThreadMemTrackerMgr _untracked_mem = 0, std::abs(-size3) > SYNC_PROC_RESERVED_INTERVAL_BYTES, + // so update process_reserved_memory. + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size3); + + thread_context->consume_memory(size1); + thread_context->consume_memory(size2); + thread_context->consume_memory(size1); + // ThreadMemTrackerMgr _reserved_mem = size1 + size3 - size1 - size2 - size1 = size3 - size2 - size1 + // ThreadMemTrackerMgr _untracked_mem = size1 + EXPECT_EQ(t->consumption(), size1 + size2 + size3); + // size1 + size3 - (size1 + size2) + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->release_reserved_memory(); + // size1 + size2 + size3 - _reserved_mem, size1 + size2 + size3 - (size3 - size2 - size1) + EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2); + // size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 - ((size3 - size2 - size1) + (size1)) = 0 + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); + + thread_context->detach_task(); + EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); +} + +TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory"); + + int64_t size2 = 4 * 1024 * 1024; + int64_t size3 = size2 * 1024; + + thread_context->attach_task(TUniqueId(), t); + thread_context->try_reserve_memory(size3); + EXPECT_EQ(t->consumption(), size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); + + thread_context->consume_memory(size2); + // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + // ThreadMemTrackerMgr _untracked_mem = 0, size2 > SYNC_PROC_RESERVED_INTERVAL_BYTES, + // update process_reserved_memory. + EXPECT_EQ(t->consumption(), size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->try_reserve_memory(size2); + // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2 + // ThreadMemTrackerMgr _untracked_mem = 0 + EXPECT_EQ(t->consumption(), size3 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), + size3); // size3 - size2 + size2 + + thread_context->try_reserve_memory(size3); + thread_context->try_reserve_memory(size3); + thread_context->consume_memory(size3); + thread_context->consume_memory(size2); + thread_context->consume_memory(size3); + // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + // ThreadMemTrackerMgr _untracked_mem = 0 + EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->release_reserved_memory(); + // size3 + size2 + size3 + size3 - _reserved_mem, size3 + size2 + size3 + size3 - (size3 - size2) + EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2); + // size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 - ((size3 - size2 - size1) + (size1)) = 0 + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); + + thread_context->detach_task(); + EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); +} + +TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { + std::unique_ptr thread_context = std::make_unique(); + std::shared_ptr t1 = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory1"); + std::shared_ptr t2 = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory2"); + std::shared_ptr t3 = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory3"); + + int64_t size1 = 4 * 1024; + int64_t size2 = 4 * 1024 * 1024; + int64_t size3 = size2 * 1024; + + thread_context->attach_task(TUniqueId(), t1); + thread_context->try_reserve_memory(size3); + thread_context->consume_memory(size2); + EXPECT_EQ(t1->consumption(), size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); + thread_context->try_reserve_memory(size3); + EXPECT_EQ(t1->consumption(), size3); + EXPECT_EQ(t2->consumption(), size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3); + + thread_context->consume_memory(size2 + size3); // reserved memory used done + EXPECT_EQ(t1->consumption(), size3); + EXPECT_EQ(t2->consumption(), size3 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3); + thread_context->try_reserve_memory(size3); + EXPECT_EQ(t1->consumption(), size3); + EXPECT_EQ(t2->consumption(), size3 + size2); + EXPECT_EQ(t3->consumption(), size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3); + + thread_context->consume_memory(-size2); + thread_context->consume_memory(-size1); + // ThreadMemTrackerMgr _reserved_mem = size3 + size2 + size1 + // ThreadMemTrackerMgr _untracked_mem = -size1 + EXPECT_EQ(t1->consumption(), size3); + EXPECT_EQ(t2->consumption(), size3 + size2); + EXPECT_EQ(t3->consumption(), size3); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), + size3 - size2 + size3 + size2); + + thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); // detach + EXPECT_EQ(t1->consumption(), size3); + EXPECT_EQ(t2->consumption(), size3 + size2); + EXPECT_EQ(t3->consumption(), -size1 - size2); // size3 - _reserved_mem + // size3 - size2 + size3 + size2 - (_reserved_mem + _untracked_mem) + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // detach + EXPECT_EQ(t1->consumption(), size3); + // not changed, reserved memory used done. + EXPECT_EQ(t2->consumption(), size3 + size2); + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); + + thread_context->detach_task(); + EXPECT_EQ(t1->consumption(), size2); // size3 - _reserved_mem + // size3 - size2 - (_reserved_mem + _untracked_mem) + EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); +} + +} // end namespace doris diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index de088f8d17b27b..75afdacd87ba6a 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -69,6 +69,8 @@ int main(int argc, char** argv) { static_cast(service->start()); doris::global_test_http_host = "http://127.0.0.1:" + std::to_string(service->get_real_port()); + doris::ExecEnv::GetInstance()->set_tracking_memory(false); + int res = RUN_ALL_TESTS(); return res; }