Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](executor) Add tvf and regression test for Workload Scheduler #28733

Merged
merged 3 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::WORKLOAD_GROUPS:
RETURN_IF_ERROR(_build_workload_groups_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::WORKLOAD_SCHED_POLICY:
RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::CATALOGS:
RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request));
break;
Expand Down Expand Up @@ -369,6 +372,23 @@ Status VMetaScanner::_build_workload_groups_metadata_request(
return Status::OK();
}

Status VMetaScanner::_build_workload_sched_policy_metadata_request(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method '_build_workload_sched_policy_metadata_request' can be made static [readability-convert-member-functions-to-static]

be/src/vec/exec/scan/vmeta_scanner.h:81:

-     Status _build_workload_sched_policy_metadata_request(const TMetaScanRange& meta_scan_range,
+     static Status _build_workload_sched_policy_metadata_request(const TMetaScanRange& meta_scan_range,

const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_workload_sched_policy_metadata_request";

// create request
request->__set_cluster_name("");
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY);
metadata_table_params.__set_current_user_ident(_user_identity);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_catalogs_metadata_request";
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class VMetaScanner : public VScanner {
TFetchSchemaTableDataRequest* request);
Status _build_workload_groups_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_workload_sched_policy_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,19 @@ public Long getWorkloadGroupIdByName(String name) {
}
}

public String getWorkloadGroupNameById(Long id) {
readLock();
try {
WorkloadGroup wg = idToWorkloadGroup.get(id);
if (wg == null) {
return null;
}
return wg.getName();
} finally {
readUnlock();
}
}

// for ut
public Map<String, WorkloadGroup> getNameToWorkloadGroup() {
return nameToWorkloadGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.doris.resource.workloadschedpolicy;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;

import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;

public class WorkloadActionMeta {

Expand All @@ -44,4 +46,18 @@ static WorkloadActionType getWorkloadActionType(String strType) throws UserExcep
}
throw new UserException("invalid action type " + strType);
}

public String toString() {
if (StringUtils.isEmpty(actionArgs)) {
return action.toString();
} else {
String retActionArgs = actionArgs;
if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(action)) {
retActionArgs = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroupNameById(Long.valueOf(actionArgs));
}
retActionArgs = retActionArgs == null ? "-1" : retActionArgs;
return action + " \"" + retActionArgs + "\"";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ static WorkloadConditionOperator getOperator(String op) throws UserException {
}
}

// used for select tvf
static String getOperatorStr(WorkloadConditionOperator op) {
switch (op) {
case EQUAL:
return "=";
case GREATER:
return ">";
case GREATER_EQUAL:
return ">=";
case LESS:
return "<";
case LESS_EQUAl:
return "<=";
default:
throw new RuntimeException("unexpected compare operator " + op);
}
}

static boolean compareInteger(WorkloadConditionOperator operator, long firstArgs, long secondArgs) {
switch (operator) {
case EQUAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ private static WorkloadMetricType getMetricType(String metricStr) throws UserExc
}

public String toString() {
return metricName + " " + op + " " + value;
return metricName + " " + WorkloadConditionCompareUtils.getOperatorStr(op) + " " + value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public class WorkloadQueryInfo {
String queryId = null;
TUniqueId tUniqueId = null;
ConnectContext context = null;
Map<WorkloadMetricType, String> metricMap;
public Map<WorkloadMetricType, String> metricMap;
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
private List<WorkloadCondition> workloadConditionList;
private List<WorkloadAction> workloadActionList;

// for ut
public WorkloadSchedPolicy() {
}

// for ut
public void setWorkloadConditionList(List<WorkloadCondition> workloadConditionList) {
this.workloadConditionList = workloadConditionList;
}

public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloadConditionList,
List<WorkloadAction> workloadActionList, Map<String, String> properties) throws UserException {
this.id = id;
Expand All @@ -77,7 +86,7 @@ public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloa
// return false,
// 1 metric not match
// 2 condition value not match query info's value
boolean isMatch(WorkloadQueryInfo queryInfo) {
public boolean isMatch(WorkloadQueryInfo queryInfo) {
for (WorkloadCondition condition : workloadConditionList) {
WorkloadMetricType metricType = condition.getMetricType();
String value = queryInfo.metricMap.get(metricType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TUserIdentity;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -69,7 +69,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {

public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
.add("Id").add("Name").add("ItemName").add("ItemValue")
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
.build();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -349,7 +349,8 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
throw new UserException("policy's priority can only between 0 ~ 100");
}
} catch (NumberFormatException e) {
throw new UserException("policy's priority must be a number, input value=" + priorityStr);
throw new UserException(
"invalid priority property value, it must be a number, input value=" + priorityStr);
}
}
}
Expand Down Expand Up @@ -448,6 +449,11 @@ public List<List<String>> getShowPolicyInfo() {
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}

public List<List<String>> getWorkloadSchedPolicyTvfInfo(TUserIdentity tcurrentUserIdentity) {
UserIdentity currentUserIdentity = UserIdentity.fromThrift(tcurrentUserIdentity);
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}

public class PolicyProcNode {
public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
Expand All @@ -460,54 +466,31 @@ public ProcResult fetchResult(UserIdentity currentUserIdentity) {
continue;
}

String pId = String.valueOf(policy.getId());
List<String> row = new ArrayList<>();
String pName = policy.getName();
row.add(String.valueOf(policy.getId()));
row.add(pName);

List<WorkloadConditionMeta> conditionList = policy.getConditionMetaList();
StringBuilder cmStr = new StringBuilder();
for (WorkloadConditionMeta cm : conditionList) {
List<String> condRow = new ArrayList<>();
condRow.add(pId);
condRow.add(pName);
condRow.add("condition");
condRow.add(cm.toString());
result.addRow(condRow);
cmStr.append(cm.toString()).append(";");
}
String retStr = cmStr.toString().toLowerCase();
row.add(retStr.substring(0, retStr.length() - 1));

List<WorkloadActionMeta> actionList = policy.getActionMetaList();
for (WorkloadActionMeta workloadActionMeta : actionList) {
List<String> actionRow = new ArrayList<>();
actionRow.add(pId);
actionRow.add(pName);
actionRow.add("action");
if (StringUtils.isEmpty(workloadActionMeta.actionArgs)) {
actionRow.add(workloadActionMeta.action.toString());
} else {
actionRow.add(workloadActionMeta.action + " " + workloadActionMeta.actionArgs);
}
result.addRow(actionRow);
StringBuilder actionStr = new StringBuilder();
for (WorkloadActionMeta am : actionList) {
actionStr.append(am.toString()).append(";");
}
String retStr2 = actionStr.toString().toLowerCase();
row.add(retStr2.substring(0, retStr2.length() - 1));

List<String> prioRow = new ArrayList<>();
prioRow.add(pId);
prioRow.add(pName);
prioRow.add("priority");
prioRow.add(String.valueOf(policy.getPriority()));
result.addRow(prioRow);

List<String> enabledRow = new ArrayList<>();
enabledRow.add(pId);
enabledRow.add(pName);
enabledRow.add("enabled");
enabledRow.add(String.valueOf(policy.isEnabled()));
result.addRow(enabledRow);


List<String> versionRow = new ArrayList<>();
versionRow.add(pId);
versionRow.add(pName);
versionRow.add("version");
versionRow.add(String.valueOf(policy.getVersion()));
result.addRow(versionRow);
row.add(String.valueOf(policy.getPriority()));
row.add(String.valueOf(policy.isEnabled()));
row.add(String.valueOf(policy.getVersion()));
result.addRow(row);
}
} finally {
readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData
case QUERIES:
result = queriesMetadataResult(params, request);
break;
case WORKLOAD_SCHED_POLICY:
result = workloadSchedPolicyMetadataResult(params);
break;
default:
return errorResult("Metadata table params is not set.");
}
Expand Down Expand Up @@ -383,6 +386,33 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat
return result;
}

private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetCurrentUserIdent()) {
return errorResult("current user ident is not set.");
}

TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent();
List<List<String>> workloadPolicyList = Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
.getWorkloadSchedPolicyTvfInfo(tcurrentUserIdentity);
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> policyRow : workloadPolicyList) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
dataBatch.add(trow);
}

result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}

private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co
return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case QUERIES:
return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
case WORKLOAD_SCHED_POLICY:
return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
throw new AnalysisException("Unknown Metadata TableValuedFunction type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
return new GroupCommitTableValuedFunction(params);
case QueriesTableValuedFunction.NAME:
return new QueriesTableValuedFunction(params);
case WorkloadSchedPolicyTableValuedFunction.NAME:
return new WorkloadSchedPolicyTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}
Expand Down
Loading
Loading