Skip to content

Commit

Permalink
[feature](workloadgroup) use slot num to control memory distribution …
Browse files Browse the repository at this point in the history
…among queries
  • Loading branch information
Doris-Extras committed Sep 18, 2024
1 parent 736dd55 commit 914df8c
Show file tree
Hide file tree
Showing 17 changed files with 206 additions and 72 deletions.
3 changes: 1 addition & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
<< print_id(query_ctx->query_id());
}
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
seek_num++;
// 32M small query does not cancel
if (tracker->consumption() <= 33554432 ||
tracker->consumption() < tracker->limit()) {
tracker->consumption() < tracker->_limit) {
small_num++;
continue;
}
Expand All @@ -719,7 +719,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
continue;
}
auto overcommit_ratio = int64_t(
(static_cast<double>(tracker->consumption()) / tracker->limit()) *
(static_cast<double>(tracker->consumption()) / tracker->_limit) *
10000);
max_pq.emplace(overcommit_ratio, tracker->label());
query_consumption[tracker->label()] = tracker->consumption();
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class MemTrackerLimiter final {
bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); }
Status check_limit(int64_t bytes = 0);
bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; }
void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); }

Expand Down Expand Up @@ -340,7 +341,7 @@ class MemTrackerLimiter final {
MemCounter _reserved_counter;

// Limit on memory consumption, in bytes.
int64_t _limit;
std::atomic<int64_t> _limit;

// Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp.
int64_t _group_num;
Expand Down
16 changes: 8 additions & 8 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,26 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,

void QueryContext::_init_query_mem_tracker() {
bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
int64_t _bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
if (_bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
int64_t bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
if (bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< ". Using process memory limit instead";
_bytes_limit = MemInfo::mem_limit();
bytes_limit = MemInfo::mem_limit();
}
if (_query_options.query_type == TQueryType::SELECT) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
_bytes_limit);
bytes_limit);
} else if (_query_options.query_type == TQueryType::LOAD) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
_bytes_limit);
bytes_limit);
} else { // EXTERNAL
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
_bytes_limit);
bytes_limit);
}
if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
Expand All @@ -152,7 +152,7 @@ QueryContext::~QueryContext() {
std::string mem_tracker_msg;
if (query_mem_tracker->peak_consumption() != 0) {
mem_tracker_msg = fmt::format(
", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
", deregister query/load memory tracker, queryId={}, CurrUsed={}, "
"PeakUsed={}",
print_id(_query_id), MemCounter::print_bytes(query_mem_tracker->limit()),
MemCounter::print_bytes(query_mem_tracker->consumption()),
Expand Down
18 changes: 13 additions & 5 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; }

int64_t mem_limit() const { return _bytes_limit; }

void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
Expand Down Expand Up @@ -238,8 +236,19 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
bool* has_running_task) const;
size_t get_revocable_size() const;

void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
int64_t spill_threshold() { return _spill_threshold; }
void set_mem_limit(int64_t new_mem_limit) { query_mem_tracker->set_limit(new_mem_limit); }

std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return query_mem_tracker; }

int32_t get_slot_count() {
return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1;
}

bool enable_query_slot_hard_limit() {
return _query_options.__isset.enable_query_slot_hard_limit
? _query_options.enable_query_slot_hard_limit
: false;
}
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
Expand Down Expand Up @@ -319,7 +328,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
MonotonicStopWatch _query_watcher;
int64_t _bytes_limit = 0;
bool _is_pipeline = false;
bool _is_nereids = false;
std::atomic<int> _running_big_mem_op_num = 0;
Expand Down
16 changes: 12 additions & 4 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {

AttachTask::AttachTask(RuntimeState* runtime_state) {
signal::set_signal_is_nereids(runtime_state->is_nereids());
QueryThreadContext query_thread_context = {runtime_state->query_id(),
runtime_state->query_mem_tracker(),
runtime_state->get_query_ctx()->workload_group()};
init(query_thread_context);
// RuntimeState not always has query ctx.
// For example during push handler or schema change
if (runtime_state->get_query_ctx() == nullptr) {
QueryThreadContext query_thread_context = {runtime_state->query_id(),
runtime_state->query_mem_tracker()};
init(query_thread_context);
} else {
QueryThreadContext query_thread_context = {
runtime_state->query_id(), runtime_state->query_mem_tracker(),
runtime_state->get_query_ctx()->workload_group()};
init(query_thread_context);
}
}

AttachTask::AttachTask(const QueryThreadContext& query_thread_context) {
Expand Down
12 changes: 11 additions & 1 deletion be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
// This is a invalid value, and should ignore this value during usage
const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;

WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
: _id(tg_info.id),
Expand Down Expand Up @@ -141,6 +143,7 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_spill_high_watermark = tg_info.spill_high_watermark;
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
_total_query_slot_count = tg_info.total_query_slot_count;
} else {
return;
}
Expand Down Expand Up @@ -403,6 +406,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
remote_read_bytes_per_second = tworkload_group_info.remote_read_bytes_per_second;
}

// 16 total slots
int total_query_slot_count = TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.total_query_slot_count) {
total_query_slot_count = tworkload_group_info.total_query_slot_count;
}

return {.id = tg_id,
.name = name,
.cpu_share = cpu_share,
Expand All @@ -417,7 +426,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.spill_low_watermark = spill_low_watermark,
.spill_high_watermark = spill_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second};
.remote_read_bytes_per_second = remote_read_bytes_per_second,
.total_query_slot_count = total_query_slot_count};
}

void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) {
Expand Down
16 changes: 14 additions & 2 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,22 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
}
int spill_threashold_high_water_mark() const {

int spill_threshold_high_water_mark() const {
return _spill_high_watermark.load(std::memory_order_relaxed);
}

void set_weighted_memory_ratio(double ratio);
int total_query_slot_count() const {
return _total_query_slot_count.load(std::memory_order_relaxed);
}

bool add_wg_refresh_interval_memory_growth(int64_t size) {
// If a group is enable memory overcommit, then not need check the limit
// It is always true, and it will only fail when process memory is not
// enough.
if (_enable_memory_overcommit) {
return true;
}
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
Expand Down Expand Up @@ -232,6 +242,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::atomic<int> _spill_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};
std::atomic<int> _total_query_slot_count = 0;

// means workload group is mark dropped
// new query can not submit
Expand Down Expand Up @@ -275,6 +286,7 @@ struct WorkloadGroupInfo {
const int spill_high_watermark = 0;
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
const int total_query_slot_count = 0;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
Expand Down
97 changes: 68 additions & 29 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ struct WorkloadGroupMemInfo {
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
std::list<std::shared_ptr<MemTrackerLimiter>>();
};

void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);

Expand Down Expand Up @@ -195,59 +194,99 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 10) << debug_msg;

LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
// 3.1 calculate query spill threshold of task group
auto wg_weighted_mem_limit =
int64_t(wg.second->memory_limit() * weighted_memory_limit_ratio);
auto wg_mem_limit = wg.second->memory_limit();
auto wg_weighted_mem_limit = int64_t(wg_mem_limit * weighted_memory_limit_ratio);
wg.second->set_weighted_memory_limit(wg_weighted_mem_limit);

// 3.2 set workload groups weighted memory limit and all query spill threshold.
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
int64_t query_spill_threshold =
wg_query_count ? (wg_weighted_mem_limit + wg_query_count) / wg_query_count
: wg_weighted_mem_limit;
for (const auto& query : wg.second->queries()) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
query_ctx->set_spill_threshold(query_spill_threshold);
}

// 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark.
auto all_query_ctxs = wg.second->queries();
bool is_low_wartermark = false;
bool is_high_wartermark = false;
wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
int64_t wg_high_water_mark_limit =
wg_mem_limit * wg.second->spill_threshold_high_water_mark() / 100;
int64_t weighted_high_water_mark_limit =
wg_weighted_mem_limit * wg.second->spill_threshold_high_water_mark() / 100;
std::string debug_msg;
if (is_high_wartermark || is_low_wartermark) {
debug_msg = fmt::format(
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, "
"used "
"ratio: {}, query count: {}, query spill threshold: {}",
"high water mark mem limit: {}, used ratio: {}",
wg.second->name(),
PrettyPrinter::print(wg.second->memory_limit(), TUnit::BYTES),
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
(double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit,
wg_query_count, PrettyPrinter::print(query_spill_threshold, TUnit::BYTES));
PrettyPrinter::print(weighted_high_water_mark_limit, TUnit::BYTES),
(double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit);

debug_msg += "\n Query Memory Summary:";
// check whether queries need to revoke memory for task group
for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) {
debug_msg += fmt::format(
"\n MemTracker Label={}, Used={}, SpillThreshold={}, "
"\n MemTracker Label={}, Used={}, MemLimit={}, "
"Peak={}",
query_mem_tracker->label(),
PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(query_spill_threshold, TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
}
LOG_EVERY_T(INFO, 1) << debug_msg;
} else {
continue;
}

int32_t total_used_slot_count = 0;
int32_t total_slot_count = wg.second->total_query_slot_count();
// calculate total used slot count
for (const auto& query : all_query_ctxs) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
total_used_slot_count += query_ctx->get_slot_count();
}
// calculate per query weighted memory limit
debug_msg = "Query Memory Summary:";
for (const auto& query : all_query_ctxs) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
int64_t query_weighted_mem_limit = 0;
// If the query enable hard limit, then it should not use the soft limit
if (query_ctx->enable_query_slot_hard_limit()) {
if (total_slot_count < 1) {
LOG(WARNING)
<< "query " << print_id(query_ctx->query_id())
<< " enabled hard limit, but the slot count < 1, could not take affect";
} else {
// If the query enable hard limit, then not use weighted info any more, just use the settings limit.
query_weighted_mem_limit =
(wg_high_water_mark_limit * query_ctx->get_slot_count()) /
total_slot_count;
}
} else {
// If low water mark is not reached, then use process memory limit as query memory limit.
// It means it will not take effect.
if (!is_low_wartermark) {
query_weighted_mem_limit = process_memory_limit;
} else {
query_weighted_mem_limit =
total_used_slot_count > 0
? (wg_high_water_mark_limit + total_used_slot_count) *
query_ctx->get_slot_count() / total_used_slot_count
: wg_high_water_mark_limit;
}
}
debug_msg += fmt::format(
"\n MemTracker Label={}, Used={}, Limit={}, Peak={}",
query_ctx->get_mem_tracker()->label(),
PrettyPrinter::print(query_ctx->get_mem_tracker()->consumption(), TUnit::BYTES),
PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES),
PrettyPrinter::print(query_ctx->get_mem_tracker()->peak_consumption(),
TUnit::BYTES));

query_ctx->set_mem_limit(query_weighted_mem_limit);
}
LOG_EVERY_T(INFO, 60) << debug_msg;
}
}

Expand Down
6 changes: 0 additions & 6 deletions be/test/runtime/memory/mem_tracker_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,24 @@ namespace doris {

TEST(MemTrackerTest, SingleTrackerNoLimit) {
auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL);
EXPECT_FALSE(t->has_limit());
t->consume(10);
EXPECT_EQ(t->consumption(), 10);
t->consume(10);
EXPECT_EQ(t->consumption(), 20);
t->release(15);
EXPECT_EQ(t->consumption(), 5);
EXPECT_FALSE(t->limit_exceeded());
t->release(5);
}

TEST(MemTrackerTest, SingleTrackerWithLimit) {
auto t = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "limit tracker",
11);
EXPECT_TRUE(t->has_limit());
t->consume(10);
EXPECT_EQ(t->consumption(), 10);
EXPECT_FALSE(t->limit_exceeded());
t->consume(10);
EXPECT_EQ(t->consumption(), 20);
EXPECT_TRUE(t->limit_exceeded());
t->release(15);
EXPECT_EQ(t->consumption(), 5);
EXPECT_FALSE(t->limit_exceeded());
t->release(5);
}

Expand Down
Loading

0 comments on commit 914df8c

Please sign in to comment.