Skip to content

Commit

Permalink
[feature](workloadgroup) use slot num to control memory distribution …
Browse files Browse the repository at this point in the history
…among queries

f

f

f
  • Loading branch information
Doris-Extras committed Aug 18, 2024
1 parent ff892b3 commit 740eb24
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 39 deletions.
9 changes: 9 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ class QueryContext {

void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
int64_t spill_threshold() { return _spill_threshold; }
int32_t get_slot_count() {
return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1;
}

bool enable_query_slot_hard_limit() {
return _query_options.__isset.enable_query_slot_hard_limit
? _query_options.enable_query_slot_hard_limit
: false;
}
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
Expand Down
15 changes: 14 additions & 1 deletion be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
// This is a invalid value, and should ignore this value during usage
const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;

WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
: _id(tg_info.id),
Expand Down Expand Up @@ -120,8 +122,12 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
_spill_low_watermark = tg_info.spill_low_watermark;
_spill_high_watermark = tg_info.spill_high_watermark;
<<<<<<< HEAD
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
=======
_total_query_slot_count = tg_info.total_query_slot_count;
>>>>>>> 5ad0005a4d ([feature](workloadgroup) use slot num to control memory distribution among queries)
} else {
return;
}
Expand Down Expand Up @@ -384,6 +390,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
remote_read_bytes_per_second = tworkload_group_info.remote_read_bytes_per_second;
}

// 16 total slots
int total_query_slot_count = TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.total_query_slot_count) {
total_query_slot_count = tworkload_group_info.total_query_slot_count;
}

return {.id = tg_id,
.name = name,
.cpu_share = cpu_share,
Expand All @@ -398,7 +410,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.spill_low_watermark = spill_low_watermark,
.spill_high_watermark = spill_high_watermark,
.read_bytes_per_second = read_bytes_per_second,
.remote_read_bytes_per_second = remote_read_bytes_per_second};
.remote_read_bytes_per_second = remote_read_bytes_per_second,
.total_query_slot_count = total_query_slot_count};
}

void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) {
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
int spill_threshold_low_water_mark() const {
return _spill_low_watermark.load(std::memory_order_relaxed);
}

int spill_threashold_high_water_mark() const {
return _spill_high_watermark.load(std::memory_order_relaxed);
}

int total_query_slot_count() const {
return _total_query_slot_count.load(std::memory_order_relaxed);
}

void set_weighted_memory_ratio(double ratio);
bool add_wg_refresh_interval_memory_growth(int64_t size) {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
Expand Down Expand Up @@ -230,6 +235,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::atomic<int> _spill_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};
std::atomic<int> _total_query_slot_count = 0;

// means workload group is mark dropped
// new query can not submit
Expand Down Expand Up @@ -271,8 +277,12 @@ struct WorkloadGroupInfo {
const int min_remote_scan_thread_num = 0;
const int spill_low_watermark = 0;
const int spill_high_watermark = 0;
<<<<<<< HEAD
const int read_bytes_per_second = -1;
const int remote_read_bytes_per_second = -1;
=======
const int total_query_slot_count = 0;
>>>>>>> 5ad0005a4d ([feature](workloadgroup) use slot num to control memory distribution among queries)
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
Expand Down
92 changes: 62 additions & 30 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ struct WorkloadGroupMemInfo {
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
std::list<std::shared_ptr<MemTrackerLimiter>>();
};

void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);

Expand Down Expand Up @@ -189,28 +188,12 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 10) << debug_msg;

LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
// 3.1 calculate query spill threshold of task group
auto wg_weighted_mem_limit =
int64_t(wg.second->memory_limit() * weighted_memory_limit_ratio);
auto wg_mem_limit = wg.second->memory_limit();
auto wg_weighted_mem_limit = int64_t(wg_mem_limit * weighted_memory_limit_ratio);
wg.second->set_weighted_memory_limit(wg_weighted_mem_limit);

// 3.2 set workload groups weighted memory limit and all query spill threshold.
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
int64_t query_spill_threshold =
wg_query_count ? (wg_weighted_mem_limit + wg_query_count) / wg_query_count
: wg_weighted_mem_limit;
for (const auto& query : wg.second->queries()) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
query_ctx->set_spill_threshold(query_spill_threshold);
}

// 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark.
auto all_query_ctxs = wg.second->queries();
bool is_low_wartermark = false;
bool is_high_wartermark = false;
wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
Expand All @@ -219,29 +202,78 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
debug_msg = fmt::format(
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, "
"used "
"ratio: {}, query count: {}, query spill threshold: {}",
"ratio: {}, query count: {}",
wg.second->name(),
PrettyPrinter::print(wg.second->memory_limit(), TUnit::BYTES),
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
(double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit,
wg_query_count, PrettyPrinter::print(query_spill_threshold, TUnit::BYTES));
all_query_ctxs.size());
LOG_EVERY_T(INFO, 60) << debug_msg;
}
// If the wg is not under low water mark, then not check the spill ratio
// so that set all query ctx's spill ratio to -1, and it means will not check query's
// memlimit during reserve.
if (!is_low_wartermark) {
for (const auto& query : all_query_ctxs) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
query_ctx->set_mem_limit(-1);
}
continue;
}

debug_msg += "\n Query Memory Summary:";
// check whether queries need to revoke memory for task group
int32_t total_used_slot_count = 0;
int32_t total_slot_count = wg.second->total_query_slot_count();
// calculate total used slot count
for (const auto& query : all_query_ctxs) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
total_used_slot_count += query_ctx->get_slot_count();
}
// calculate per query weighted memory limit
for (const auto& query : all_query_ctxs) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
int64_t query_weighted_mem_limit = 0;
// If the query enable hard limit, then it should not use the soft limit
if (query_ctx->enable_query_slot_hard_limit()) {
if (total_slot_count < 1) {
LOG(WARNING)
<< "query " << print_id(query_ctx->query_id())
<< " enabled hard limit, but the slot count < 1, could not take affect";
} else {
query_weighted_mem_limit =
(wg_weighted_mem_limit * query_ctx->get_slot_count()) /
total_slot_count;
}
} else {
query_weighted_mem_limit =
total_used_slot_count > 0
? (wg_weighted_mem_limit + total_used_slot_count) *
query_ctx->get_slot_count() / total_used_slot_count
: wg_weighted_mem_limit;
}

debug_msg = "Query Memory Summary:";
for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) {
debug_msg += fmt::format(
"\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, "
"\n MemTracker Label={}, Parent Label={}, Used={}, Limit={}, "
"Peak={}",
query_mem_tracker->label(), query_mem_tracker->parent_label(),
PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(query_spill_threshold, TUnit::BYTES),
PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
}
LOG_EVERY_T(INFO, 1) << debug_msg;
} else {
continue;
query_ctx->set_mem_limit(query_weighted_mem_limit);
}
LOG_EVERY_T(INFO, 60) << debug_msg;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ public void exec() throws Exception {
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
queueToken = queryQueue.getToken(context.getSessionVariable().wgQuerySlotCount);
queueToken.get(DebugUtil.printId(queryId),
this.queryOptions.getExecutionTimeout() * 1000);
}
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ public class SessionVariable implements Serializable, Writable {

public static final String BYPASS_WORKLOAD_GROUP = "bypass_workload_group";

public static final String WG_QUERY_SLOT_COUNT = "wg_query_slot_count";

public static final String ENABLE_QUERY_SLOT_HARD_LIMIT = "enable_query_slot_hard_limit";

public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";

public static final String USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS = "use_max_length_of_varchar_in_ctas";
Expand Down Expand Up @@ -761,6 +765,17 @@ public class SessionVariable implements Serializable, Writable {
"whether bypass workload group's limitation, currently only support bypass query queue"})
public boolean bypassWorkloadGroup = false;

@VariableMgr.VarAttr(name = WG_QUERY_SLOT_COUNT, needForward = true, description = {
"每个查询占用的slot的数量,workload group的query slot的总数等于设置的最大并发数",
"Number of slots occupied by each query, the total number of query slots "
+ "of the workload group equals the maximum number of concurrent requests"})
public int wgQuerySlotCount = 1;

@VariableMgr.VarAttr(name = ENABLE_QUERY_SLOT_HARD_LIMIT, needForward = true, description = {
"是否通过硬限的方式来计算每个Slot的内存资源",
"Whether to calculate the memory resources of each Slot by hard limit"})
public boolean enableQuerySlotHardLimit = false;

@VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
public int maxColumnReaderNum = 20000;

Expand Down Expand Up @@ -3681,6 +3696,9 @@ public TQueryOptions toThrift() {
tResult.setEnableFallbackOnMissingInvertedIndex(enableFallbackOnMissingInvertedIndex);
tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
tResult.setQuerySlotCount(wgQuerySlotCount);
tResult.setEnableQuerySlotHardLimit(enableQuerySlotHardLimit);

tResult.setKeepCarriageReturn(keepCarriageReturn);

tResult.setEnableSegmentCache(enableSegmentCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Queue;
Expand Down Expand Up @@ -98,24 +99,42 @@ public String debugString() {
+ runningQueryQueue.size() + ", currentWaitingQueryNum=" + waitingQueryQueue.size();
}

public QueueToken getToken() throws UserException {
public int usedSlotCount() {
int cnt = 0;
for (Iterator iterator = runningQueryQueue.iterator(); iterator.hasNext();) {
QueueToken queueToken = (QueueToken) iterator.next();
cnt += queueToken.getQuerySlotCount();
}
return cnt;
}

public QueueToken getToken(int querySlotCount) throws UserException {
if (maxConcurrency > 0 && (querySlotCount > maxConcurrency || querySlotCount < 1)) {
throw new UserException("query slot count " + querySlotCount
+ " should be smaller than workload group's max concurrency "
+ maxConcurrency + " and > 0");
}
AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl();
queueLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
}
QueueToken queueToken = new QueueToken(queueTimeout, this);
QueueToken queueToken = new QueueToken(queueTimeout, querySlotCount, this);

boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency;
boolean hasFreeSlot = queueToken.getQuerySlotCount() <= maxConcurrency - usedSlotCount();
boolean isResourceAvailable = admissionControl.checkResourceAvailable(queueToken);
if (!isReachMaxCon && isResourceAvailable) {
if (!isReachMaxCon && isResourceAvailable && hasFreeSlot) {
runningQueryQueue.offer(queueToken);
queueToken.complete();
return queueToken;
} else if (waitingQueryQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue length=" + maxQueueSize);
} else {
if (!hasFreeSlot) {
queueToken.setQueueMsg("NO_FREE_SLOT");
}
if (isReachMaxCon) {
queueToken.setQueueMsg("WAIT_IN_QUEUE");
}
Expand Down Expand Up @@ -145,14 +164,19 @@ public void releaseAndNotify(QueueToken releaseToken) {
AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl();
queueLock.lock();
try {
runningQueryQueue.remove(releaseToken);
waitingQueryQueue.remove(releaseToken);
admissionControl.removeQueueToken(releaseToken);
if (releaseToken != null) {
runningQueryQueue.remove(releaseToken);
waitingQueryQueue.remove(releaseToken);
admissionControl.removeQueueToken(releaseToken);
}
while (runningQueryQueue.size() < maxConcurrency) {
QueueToken queueToken = waitingQueryQueue.peek();
if (queueToken == null) {
break;
}
if (queueToken.getQuerySlotCount() > maxConcurrency - usedSlotCount()) {
break;
}
if (admissionControl.checkResourceAvailable(queueToken)) {
queueToken.complete();
runningQueryQueue.offer(queueToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public enum TokenState {

private long queueStartTime = -1;
private long queueEndTime = -1;
private final int querySlotCount;

private volatile String queueMsg = "";

Expand All @@ -62,10 +63,11 @@ public enum TokenState {
// Object is just a placeholder, it's meaningless now
private CompletableFuture<Object> future;

public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) {
public QueueToken(long queueWaitTimeout, int querySlotCount, QueryQueue queryQueue) {
this.tokenId = tokenIdGenerator.addAndGet(1);
this.queueWaitTimeout = queueWaitTimeout;
this.queueStartTime = System.currentTimeMillis();
this.querySlotCount = querySlotCount;
this.queryQueue = queryQueue;
this.future = new CompletableFuture<>();
}
Expand Down Expand Up @@ -146,4 +148,8 @@ public boolean equals(Object obj) {
public long getTokenId() {
return tokenId;
}

public int getQuerySlotCount() {
return querySlotCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,11 @@ public TopicInfo toTopicInfo() {
if (!StringUtils.isEmpty(tagStr)) {
tWorkloadGroupInfo.setTag(tagStr);
}

String totalQuerySlotCountStr = properties.get(MAX_CONCURRENCY);
if (totalQuerySlotCountStr != null) {
tWorkloadGroupInfo.setTotalQuerySlotCount(Integer.parseInt(totalQuerySlotCountStr));
}

TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
Expand Down
Loading

0 comments on commit 740eb24

Please sign in to comment.