Skip to content

Commit

Permalink
[Feature] check partition scan number limit in resource group
Browse files Browse the repository at this point in the history
Signed-off-by: kaijian.ding <[email protected]>
  • Loading branch information
kaijianding committed Dec 13, 2024
1 parent 4ba6bce commit 1606d6f
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 115 deletions.
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ResourceGroup {
public static final String BIG_QUERY_SCAN_ROWS_LIMIT = "big_query_scan_rows_limit";
public static final String BIG_QUERY_CPU_SECOND_LIMIT = "big_query_cpu_second_limit";
public static final String CONCURRENCY_LIMIT = "concurrency_limit";
public static final String PARTITION_SCAN_NUMBER_LIMIT_RULE = "partition_scan_number_limit_rule";
public static final String DEFAULT_RESOURCE_GROUP_NAME = "default_wg";
public static final String DISABLE_RESOURCE_GROUP_NAME = "disable_resource_group";
public static final String DEFAULT_MV_RESOURCE_GROUP_NAME = "default_mv_wg";
Expand Down Expand Up @@ -118,6 +119,9 @@ public ColumnMeta(Column column, BiFunction<ResourceGroup, ResourceGroupClassifi
new Column(SPILL_MEM_LIMIT_THRESHOLD, ScalarType.createVarchar(200)),
(rg, classifier) -> new DecimalFormat("#.##").format(
Objects.requireNonNullElse(rg.getSpillMemLimitThreshold(), 1.0) * 100) + "%"),
new ColumnMeta(
new Column(PARTITION_SCAN_NUMBER_LIMIT_RULE, ScalarType.createVarchar(1024)),
(rg, classifier) -> rg.getPartitionScanNumberLimitRule()),
new ColumnMeta(
new Column(GROUP_TYPE, ScalarType.createVarchar(200)),
(rg, classifier) -> rg.getResourceGroupType().name().substring("WG_".length()), false),
Expand Down Expand Up @@ -172,6 +176,8 @@ public ColumnMeta(Column column, BiFunction<ResourceGroup, ResourceGroupClassifi
private TWorkGroupType resourceGroupType;
@SerializedName(value = "version")
private long version;
@SerializedName(value = "partitionScanNumberLimitRule")
private String partitionScanNumberLimitRule;

public ResourceGroup() {
}
Expand All @@ -198,6 +204,14 @@ public void setVersion(long version) {
this.version = version;
}

public String getPartitionScanNumberLimitRule() {
return partitionScanNumberLimitRule;
}

public void setPartitionScanNumberLimitRule(String partitionScanNumberLimitRule) {
this.partitionScanNumberLimitRule = partitionScanNumberLimitRule;
}

public List<List<String>> show(boolean verbose) {
if (classifiers.isEmpty()) {
return Collections.singletonList(showClassifier(new ResourceGroupClassifier(), verbose));
Expand Down Expand Up @@ -261,6 +275,10 @@ public TWorkGroup toThrift() {
twg.setWorkgroup_type(resourceGroupType);
}

if (partitionScanNumberLimitRule != null) {
twg.setPartition_scan_number_limit_rule(partitionScanNumberLimitRule);
}

twg.setExclusive_cpu_cores(getNormalizedExclusiveCpuCores());

twg.setVersion(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException
wg.setSpillMemLimitThreshold(spillMemLimitThreshold);
}

String partitionScanNumberLimitRule = changedProperties.getPartitionScanNumberLimitRule();
if (partitionScanNumberLimitRule != null) {
wg.setPartitionScanNumberLimitRule(partitionScanNumberLimitRule);
}

// Type is guaranteed to be immutable during the analyzer phase.
TWorkGroupType workGroupType = changedProperties.getResourceGroupType();
Preconditions.checkState(workGroupType == null);
Expand Down
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.VariableExpr;
import com.starrocks.authentication.UserProperty;
import com.starrocks.cluster.ClusterNamespace;
Expand Down Expand Up @@ -238,6 +239,10 @@ public class ConnectContext {

private boolean relationAliasCaseInsensitive = false;

private boolean isExplain = false;

protected Map<Long, TableName> resolvedTables = Maps.newHashMap();

private final Map<String, PrepareStmtContext> preparedStmtCtxs = Maps.newHashMap();

private UUID sessionId;
Expand Down Expand Up @@ -919,6 +924,14 @@ public boolean isRelationAliasCaseInsensitive() {
return relationAliasCaseInsensitive;
}

public void setExplain(boolean explain) {
isExplain = explain;
}

public boolean isExplain() {
return isExplain;
}

public void setForwardTimes(int forwardTimes) {
this.forwardTimes = forwardTimes;
}
Expand Down Expand Up @@ -1209,6 +1222,14 @@ public void cleanTemporaryTable() {
}
}

public Map<Long, TableName> getResolvedTables() {
return resolvedTables;
}

public void addResolvedTable(TableName tableName, long tableId) {
resolvedTables.putIfAbsent(tableId, tableName);
}

// We can not make sure the set variables are all valid. Even if some variables are invalid, we should let user continue
// to execute SQL.
public void updateByUserProperty(UserProperty userProperty) {
Expand Down
46 changes: 46 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.reflect.TypeToken;
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.analysis.TableName;
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.FsBroker;
import com.starrocks.catalog.ResourceGroup;
Expand All @@ -59,6 +61,8 @@
import com.starrocks.connector.exception.RemoteFileNotFoundException;
import com.starrocks.datacache.DataCacheSelectMetrics;
import com.starrocks.mysql.MysqlCommand;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.planner.OlapScanNode;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.PlanFragmentId;
import com.starrocks.planner.ResultSink;
Expand Down Expand Up @@ -102,6 +106,7 @@
import com.starrocks.thrift.TTabletCommitInfo;
import com.starrocks.thrift.TTabletFailInfo;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -529,6 +534,7 @@ public List<ScanNode> getScanNodes() {

@Override
public void startScheduling(ScheduleOption option) throws Exception {
checkPartitionScanNumberLimit();
try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "Pending")) {
QueryQueueManager.getInstance().maybeWait(connectContext, this);
}
Expand Down Expand Up @@ -1334,4 +1340,44 @@ private void execShortCircuit() throws Exception {
public ResultReceiver getReceiver() {
return receiver;
}

private void checkPartitionScanNumberLimit() throws StarRocksException {
if (connectContext.isExplain()
&& !connectContext.getSessionVariable().isCheckPartitionScanNumberLimitWhenExplain()) {
return;
}
if (!connectContext.getSessionVariable().isCheckPartitionScanNumberLimit()) {
return;
}
TWorkGroup workGroup = jobSpec.getResourceGroup();
if (workGroup == null) {
return;
}
if (workGroup.getPartition_scan_number_limit_rule() == null) {
return;
}
Map<String, Integer> rule = GsonUtils.GSON.fromJson(workGroup.getPartition_scan_number_limit_rule(),
new TypeToken<Map<String, Integer>>() {
}.getType());
for (ScanNode scanNode : jobSpec.getScanNodes()) {
if (!(scanNode instanceof OlapScanNode)) {
continue;
}
TableName tableName =
connectContext.getResolvedTables().get(((OlapScanNode) scanNode).getOlapTable().getId());
if (tableName == null) {
continue;
}
String tblName = tableName.getDb() + "." + tableName.getTbl();
Integer limit = rule.get(tblName);
if (limit == null) {
continue;
}
if (((OlapScanNode) scanNode).getSelectedPartitionIds().size() > limit) {
throw new StarRocksException(tblName + " scans more than " + limit +
" partition(s), which violates the limit defined in partition_scan_number_limit_rule in resource" +
" group " + workGroup.getName());
}
}
}
}
28 changes: 28 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,12 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String ENABLE_WAIT_DEPENDENT_EVENT = "enable_wait_dependent_event";

public static final String CHECK_PARTITION_SCAN_NUMBER_LIMIT
= "check_partition_scan_number_limit";

public static final String CHECK_PARTITION_SCAN_NUMBER_LIMIT_WHEN_EXPLAIN
= "check_partition_scan_number_limit_when_explain";

public static final String ENABLE_PHASED_SCHEDULER = "enable_phased_scheduler";
public static final String PHASED_SCHEDULER_MAX_CONCURRENCY = "phased_scheduler_max_concurrency";

Expand Down Expand Up @@ -2435,6 +2441,12 @@ public int getExprChildrenLimit() {
return exprChildrenLimit;
}

@VarAttr(name = CHECK_PARTITION_SCAN_NUMBER_LIMIT)
private boolean checkPartitionScanNumberLimit = true;

@VarAttr(name = CHECK_PARTITION_SCAN_NUMBER_LIMIT_WHEN_EXPLAIN)
private boolean checkPartitionScanNumberLimitWhenExplain = false;

public void setExprChildrenLimit(int exprChildrenLimit) {
this.exprChildrenLimit = exprChildrenLimit;
}
Expand Down Expand Up @@ -4452,6 +4464,22 @@ public boolean isEnableRewriteUnnestBitmapToArray() {
return enableRewriteUnnestBitmapToArray;
}

public boolean isCheckPartitionScanNumberLimit() {
return checkPartitionScanNumberLimit;
}

public void setCheckPartitionScanNumberLimit(boolean checkPartitionScanNumberLimit) {
this.checkPartitionScanNumberLimit = checkPartitionScanNumberLimit;
}

public boolean isCheckPartitionScanNumberLimitWhenExplain() {
return checkPartitionScanNumberLimitWhenExplain;
}

public void setCheckPartitionScanNumberLimitWhenExplain(boolean checkPartitionScanNumberLimitWhenExplain) {
this.checkPartitionScanNumberLimitWhenExplain = checkPartitionScanNumberLimitWhenExplain;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
boolean executeInFe = !isExplainAnalyze && !isSchedulerExplain && !isOutfileQuery
&& canExecuteInFe(context, execPlan.getPhysicalPlan());

context.setExplain(parsedStmt.isExplain());
if (isExplainAnalyze) {
context.getSessionVariable().setEnableProfile(true);
context.getSessionVariable().setEnableAsyncProfile(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,7 @@ public Table resolveTable(TableRelation tableRelation) {
}
}

session.addResolvedTable(tableName, table.getId());
return table;
} catch (AnalysisException e) {
throw new SemanticException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.sql.analyzer;

import com.google.common.base.Splitter;
import com.google.common.reflect.TypeToken;
import com.starrocks.analysis.BinaryPredicate;
import com.starrocks.analysis.BinaryType;
import com.starrocks.analysis.Expr;
Expand All @@ -25,6 +26,7 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ResourceGroup;
import com.starrocks.catalog.ResourceGroupClassifier;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.BackendResourceStat;
import com.starrocks.thrift.TWorkGroupType;
Expand Down Expand Up @@ -274,6 +276,19 @@ public static void analyzeProperties(ResourceGroup resourceGroup, Map<String, St
"but it is set to `%s`", e.getKey(), e.getValue()));
}

if (key.equalsIgnoreCase(ResourceGroup.PARTITION_SCAN_NUMBER_LIMIT_RULE)) {
try {
// validate the rule
Map<String, Integer> ignored = GsonUtils.GSON.fromJson(value,
new TypeToken<Map<String, Integer>>() {
}.getType());
resourceGroup.setPartitionScanNumberLimitRule(value);
} catch (Exception ignored) {
throw new SemanticException("Not valid partition scan number limit rule json");
}
continue;
}

throw new SemanticException("Unknown property: " + key);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ public void analyze() {
changedProperties.getBigQueryCpuSecondLimit() == null &&
changedProperties.getBigQueryMemLimit() == null &&
changedProperties.getBigQueryScanRowsLimit() == null &&
changedProperties.getPartitionScanNumberLimitRule() == null &&
changedProperties.getSpillMemLimitThreshold() == null) {
throw new SemanticException("At least one of ('cpu_weight','exclusive_cpu_cores','mem_limit'," +
"'max_cpu_cores','concurrency_limit','big_query_mem_limit', 'big_query_scan_rows_limit'," +
"'big_query_cpu_second_limit','spill_mem_limit_threshold') " +
"'big_query_cpu_second_limit','spill_mem_limit_threshold','partition_scan_number_limit_rule') " +
"should be specified");
}
}
Expand Down
Loading

0 comments on commit 1606d6f

Please sign in to comment.