Skip to content

Commit

Permalink
[improvement](executor) Add tvf and regression test for Workload Sche…
Browse files Browse the repository at this point in the history
…duler (#28733)

1 Add select workload schedule policy tvf
2 Add reg test
  • Loading branch information
wangbo authored Dec 22, 2023
1 parent c72ad9b commit 012e667
Show file tree
Hide file tree
Showing 18 changed files with 637 additions and 45 deletions.
20 changes: 20 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::WORKLOAD_GROUPS:
RETURN_IF_ERROR(_build_workload_groups_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::WORKLOAD_SCHED_POLICY:
RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::CATALOGS:
RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request));
break;
Expand Down Expand Up @@ -379,6 +382,23 @@ Status VMetaScanner::_build_workload_groups_metadata_request(
return Status::OK();
}

Status VMetaScanner::_build_workload_sched_policy_metadata_request(
const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_workload_sched_policy_metadata_request";

// create request
request->__set_cluster_name("");
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY);
metadata_table_params.__set_current_user_ident(_user_identity);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_catalogs_metadata_request";
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class VMetaScanner : public VScanner {
TFetchSchemaTableDataRequest* request);
Status _build_workload_groups_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_workload_sched_policy_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,38 @@ public List<WorkloadActionMeta> getActions() {
public Map<String, String> getProperties() {
return properties;
}

@Override
public String toSql() {
String str = "";
str = str + "CREAYE ";
str = str + "WORKLOAD SCHEDULE POLICY " + policyName + " ";

str = str + " CONDITIONS( ";
if (conditions != null) {
for (WorkloadConditionMeta wcm : conditions) {
str += wcm.toString() + ",";
}
}
str = str.substring(0, str.length() - 1);
str = str + ")";

str = str + " ACTIONS( ";
if (actions != null) {
for (WorkloadActionMeta wam : actions) {
str = str + wam.toString() + ",";
}
}
str = str.substring(0, str.length() - 1);
str = str + ")";

str = str + " PROPERTIES(";
for (Map.Entry<String, String> entry : properties.entrySet()) {
str = str + "\"" + entry.getKey() + "\"" + "=" + "\"" + entry.getValue() + "\",";
}
str = str.substring(0, str.length() - 1);
str = str + ")";

return str;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,19 @@ public Long getWorkloadGroupIdByName(String name) {
}
}

public String getWorkloadGroupNameById(Long id) {
readLock();
try {
WorkloadGroup wg = idToWorkloadGroup.get(id);
if (wg == null) {
return null;
}
return wg.getName();
} finally {
readUnlock();
}
}

// for ut
public Map<String, WorkloadGroup> getNameToWorkloadGroup() {
return nameToWorkloadGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.doris.resource.workloadschedpolicy;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;

import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;

public class WorkloadActionMeta {

Expand All @@ -44,4 +46,18 @@ static WorkloadActionType getWorkloadActionType(String strType) throws UserExcep
}
throw new UserException("invalid action type " + strType);
}

public String toString() {
if (StringUtils.isEmpty(actionArgs)) {
return action.toString();
} else {
String retActionArgs = actionArgs;
if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(action)) {
retActionArgs = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroupNameById(Long.valueOf(actionArgs));
}
retActionArgs = retActionArgs == null ? "-1" : retActionArgs;
return action + " \"" + retActionArgs + "\"";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ static WorkloadConditionOperator getOperator(String op) throws UserException {
}
}

// used for select tvf
static String getOperatorStr(WorkloadConditionOperator op) {
switch (op) {
case EQUAL:
return "=";
case GREATER:
return ">";
case GREATER_EQUAL:
return ">=";
case LESS:
return "<";
case LESS_EQUAl:
return "<=";
default:
throw new RuntimeException("unexpected compare operator " + op);
}
}

static boolean compareInteger(WorkloadConditionOperator operator, long firstArgs, long secondArgs) {
switch (operator) {
case EQUAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ private static WorkloadMetricType getMetricType(String metricStr) throws UserExc
}

public String toString() {
return metricName + " " + op + " " + value;
return metricName + " " + WorkloadConditionCompareUtils.getOperatorStr(op) + " " + value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public class WorkloadQueryInfo {
String queryId = null;
TUniqueId tUniqueId = null;
ConnectContext context = null;
Map<WorkloadMetricType, String> metricMap;
public Map<WorkloadMetricType, String> metricMap;
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable {
private List<WorkloadCondition> workloadConditionList;
private List<WorkloadAction> workloadActionList;

// for ut
public WorkloadSchedPolicy() {
}

// for ut
public void setWorkloadConditionList(List<WorkloadCondition> workloadConditionList) {
this.workloadConditionList = workloadConditionList;
}

public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloadConditionList,
List<WorkloadAction> workloadActionList, Map<String, String> properties) throws UserException {
this.id = id;
Expand All @@ -77,7 +86,7 @@ public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> workloa
// return false,
// 1 metric not match
// 2 condition value not match query info's value
boolean isMatch(WorkloadQueryInfo queryInfo) {
public boolean isMatch(WorkloadQueryInfo queryInfo) {
for (WorkloadCondition condition : workloadConditionList) {
WorkloadMetricType metricType = condition.getMetricType();
String value = queryInfo.metricMap.get(metricType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TUserIdentity;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -69,7 +69,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable {

public static final ImmutableList<String> WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
= new ImmutableList.Builder<String>()
.add("Id").add("Name").add("ItemName").add("ItemValue")
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
.build();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -349,7 +349,8 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
throw new UserException("policy's priority can only between 0 ~ 100");
}
} catch (NumberFormatException e) {
throw new UserException("policy's priority must be a number, input value=" + priorityStr);
throw new UserException(
"invalid priority property value, it must be a number, input value=" + priorityStr);
}
}
}
Expand Down Expand Up @@ -448,6 +449,11 @@ public List<List<String>> getShowPolicyInfo() {
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}

public List<List<String>> getWorkloadSchedPolicyTvfInfo(TUserIdentity tcurrentUserIdentity) {
UserIdentity currentUserIdentity = UserIdentity.fromThrift(tcurrentUserIdentity);
return policyProcNode.fetchResult(currentUserIdentity).getRows();
}

public class PolicyProcNode {
public ProcResult fetchResult(UserIdentity currentUserIdentity) {
BaseProcResult result = new BaseProcResult();
Expand All @@ -460,54 +466,31 @@ public ProcResult fetchResult(UserIdentity currentUserIdentity) {
continue;
}

String pId = String.valueOf(policy.getId());
List<String> row = new ArrayList<>();
String pName = policy.getName();
row.add(String.valueOf(policy.getId()));
row.add(pName);

List<WorkloadConditionMeta> conditionList = policy.getConditionMetaList();
StringBuilder cmStr = new StringBuilder();
for (WorkloadConditionMeta cm : conditionList) {
List<String> condRow = new ArrayList<>();
condRow.add(pId);
condRow.add(pName);
condRow.add("condition");
condRow.add(cm.toString());
result.addRow(condRow);
cmStr.append(cm.toString()).append(";");
}
String retStr = cmStr.toString().toLowerCase();
row.add(retStr.substring(0, retStr.length() - 1));

List<WorkloadActionMeta> actionList = policy.getActionMetaList();
for (WorkloadActionMeta workloadActionMeta : actionList) {
List<String> actionRow = new ArrayList<>();
actionRow.add(pId);
actionRow.add(pName);
actionRow.add("action");
if (StringUtils.isEmpty(workloadActionMeta.actionArgs)) {
actionRow.add(workloadActionMeta.action.toString());
} else {
actionRow.add(workloadActionMeta.action + " " + workloadActionMeta.actionArgs);
}
result.addRow(actionRow);
StringBuilder actionStr = new StringBuilder();
for (WorkloadActionMeta am : actionList) {
actionStr.append(am.toString()).append(";");
}
String retStr2 = actionStr.toString().toLowerCase();
row.add(retStr2.substring(0, retStr2.length() - 1));

List<String> prioRow = new ArrayList<>();
prioRow.add(pId);
prioRow.add(pName);
prioRow.add("priority");
prioRow.add(String.valueOf(policy.getPriority()));
result.addRow(prioRow);

List<String> enabledRow = new ArrayList<>();
enabledRow.add(pId);
enabledRow.add(pName);
enabledRow.add("enabled");
enabledRow.add(String.valueOf(policy.isEnabled()));
result.addRow(enabledRow);


List<String> versionRow = new ArrayList<>();
versionRow.add(pId);
versionRow.add(pName);
versionRow.add("version");
versionRow.add(String.valueOf(policy.getVersion()));
result.addRow(versionRow);
row.add(String.valueOf(policy.getPriority()));
row.add(String.valueOf(policy.isEnabled()));
row.add(String.valueOf(policy.getVersion()));
result.addRow(row);
}
} finally {
readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData
case QUERIES:
result = queriesMetadataResult(params, request);
break;
case WORKLOAD_SCHED_POLICY:
result = workloadSchedPolicyMetadataResult(params);
break;
default:
return errorResult("Metadata table params is not set.");
}
Expand Down Expand Up @@ -383,6 +386,33 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat
return result;
}

private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetCurrentUserIdent()) {
return errorResult("current user ident is not set.");
}

TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent();
List<List<String>> workloadPolicyList = Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
.getWorkloadSchedPolicyTvfInfo(tcurrentUserIdentity);
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> policyRow : workloadPolicyList) {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition
trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
dataBatch.add(trow);
}

result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}

private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co
return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case QUERIES:
return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
case WORKLOAD_SCHED_POLICY:
return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
throw new AnalysisException("Unknown Metadata TableValuedFunction type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
return new GroupCommitTableValuedFunction(params);
case QueriesTableValuedFunction.NAME:
return new QueriesTableValuedFunction(params);
case WorkloadSchedPolicyTableValuedFunction.NAME:
return new WorkloadSchedPolicyTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}
Expand Down
Loading

0 comments on commit 012e667

Please sign in to comment.