Skip to content

Commit

Permalink
[Enhancement] Append LIST partition info when list table partitions b…
Browse files Browse the repository at this point in the history
…y rest api.

Signed-off-by: plotor <[email protected]>
  • Loading branch information
plotor committed Dec 4, 2024
1 parent e7ad30a commit ef6b979
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

package com.starrocks.http.rest;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.starrocks.analysis.TableName;
Expand Down Expand Up @@ -127,7 +126,6 @@ protected void executeWithoutPassword(BaseRequest request, BaseResponse response
|| Strings.isNullOrEmpty(tableName)) {
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST, "{database}/{table} must be selected");
}
String sql;
if (Strings.isNullOrEmpty(postContent)) {
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST,
"POST body must contains [sql] root object");
Expand All @@ -139,7 +137,7 @@ protected void executeWithoutPassword(BaseRequest request, BaseResponse response
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST,
"malformed json [ " + postContent + " ]");
}
sql = jsonObject.optString("sql");
String sql = jsonObject.optString("sql");
if (Strings.isNullOrEmpty(sql)) {
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST,
"POST body must contains [sql] root object");
Expand Down Expand Up @@ -180,11 +178,11 @@ protected void executeWithoutPassword(BaseRequest request, BaseResponse response
resultMap.put("status", e.getCode().code());
resultMap.put("exception", e.getMessage());
}
ObjectMapper mapper = new ObjectMapper();

try {
String result = mapper.writeValueAsString(resultMap);
// send result with extra information
response.setContentType("application/json");
response.setContentType(JSON_CONTENT_TYPE);
response.getContent().append(result);
sendResult(request, response,
HttpResponseStatus.valueOf(Integer.parseInt(String.valueOf(resultMap.get("status")))));
Expand Down Expand Up @@ -222,9 +220,10 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
context.getSessionVariable().setSingleNodeExecPlan(false);
context.getSessionVariable().setSqlSelectLimit(limit);
} catch (Exception e) {
LOG.error("error occurred when optimizing queryId: {}", context.getQueryId(), e);
throw new StarRocksHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"The Sql is invalid");
LOG.error("Get query plan for sql[{}] error, queryId: {}", sql, context.getQueryId(), e);
throw new StarRocksHttpException(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Invalid SQL: " + sql);
}

// only process select semantic
Expand All @@ -245,28 +244,28 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
if (AnalyzerUtils.collectAllTable(statementBase).size() != 1) {
if (stmt.getRelation() instanceof TableRelation) {
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must have only one table");
"Select statement must have only one table: " + sql);
}
}

if (stmt.getRelation() instanceof SubqueryRelation) {
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must not embed another statement");
"Select statement must not embed another statement: " + sql);
}

// check consistent http requested resource with sql referenced
// if consistent in this way, can avoid check privilege
TableName tableAndDb = stmt.getRelation().getResolveTableName();
if (!(tableAndDb.getDb().equals(requestDb) && tableAndDb.getTbl().equals(requestTable))) {
throw new StarRocksHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + tableAndDb.toString() + "]");
"Requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + tableAndDb + "]");
}

if (execPlan == null) {
LOG.error("plan is null for queryId: {}", context.getQueryId());
LOG.error("Null exec plan for sql[{}], queryId: {}", sql, context.getQueryId());
throw new StarRocksHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"The Sql is invalid");
"Invalid SQL: " + sql);
}

if (execPlan.getScanNodes().isEmpty() && FeConstants.enablePruneEmptyOutputScan) {
Expand All @@ -280,15 +279,15 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
// in this way, just retrieve only one scannode
if (execPlan.getScanNodes().size() != 1) {
throw new StarRocksHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one ScanNode but found [ " + execPlan.getScanNodes().size() + "]");
"Planner should plan just only 1 ScanNode but found " + execPlan.getScanNodes().size() + ", sql is " + sql);
}
List<TScanRangeLocations> scanRangeLocations =
execPlan.getScanNodes().get(0).getScanRangeLocations(0);
// acquire the PlanFragment which the executable template
List<PlanFragment> fragments = execPlan.getFragments();
if (fragments.size() != 1) {
throw new StarRocksHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one PlanFragment but found [ " + fragments.size() + "]");
"Planner should plan just only 1 PlanFragment but found " + fragments.size() + ", sql is " + sql);
}

TQueryPlanInfo tQueryPlanInfo = new TQueryPlanInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.gson.annotations.SerializedName;
import com.staros.client.StarClientException;
import com.staros.proto.ShardInfo;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
Expand All @@ -28,6 +29,7 @@
import com.starrocks.lake.LakeTablet;
import com.starrocks.lake.StarOSAgent;
import com.starrocks.load.PartitionUtils;
import com.starrocks.load.PartitionUtils.RangePartitionBoundary;
import com.starrocks.server.GlobalStateMgr;
import org.apache.commons.collections4.CollectionUtils;

Expand Down Expand Up @@ -119,6 +121,9 @@ public static class PartitionView {
@SerializedName("endKeys")
private List<Object> endKeys;

@SerializedName("inKeys")
private List<List<Object>> inKeys;

@SerializedName("storagePath")
private String storagePath;

Expand All @@ -133,7 +138,8 @@ public PartitionView() {
*/
public static PartitionView createFrom(PartitionInfo partitionInfo, Partition partition) {
PartitionView pvo = new PartitionView();
pvo.setId(partition.getId());
long partitionId = partition.getId();
pvo.setId(partitionId);
pvo.setName(partition.getName());

Optional.ofNullable(partition.getDistributionInfo()).ifPresent(distributionInfo -> {
Expand All @@ -157,14 +163,23 @@ public static PartitionView createFrom(PartitionInfo partitionInfo, Partition pa
break;
case RANGE:
RangePartitionInfo rpi = (RangePartitionInfo) partitionInfo;
PartitionUtils.RangePartitionBoundary boundary =
PartitionUtils.calRangePartitionBoundary(rpi.getRange(partition.getId()));
RangePartitionBoundary boundary =
PartitionUtils.calRangePartitionBoundary(rpi.getRange(partitionId));
pvo.setMinPartition(boundary.isMinPartition());
pvo.setMaxPartition(boundary.isMaxPartition());
pvo.setStartKeys(boundary.getStartKeys());
pvo.setEndKeys(boundary.getEndKeys());
break;
// LIST/EXPR_RANGE_V2
case LIST:
ListPartitionInfo lpi = (ListPartitionInfo) partitionInfo;
List<List<Object>> keys = PartitionUtils.calListPartitionKeys(
Optional.ofNullable(lpi.getMultiLiteralExprValues())
.map(exprVals -> exprVals.get(partitionId)).orElse(new ArrayList<>(0)),
Optional.ofNullable(lpi.getLiteralExprValues())
.map(exprVals -> exprVals.get(partitionId)).orElse(new ArrayList<>(0))
);
pvo.setInKeys(keys);
break;
default:
// TODO add more type support in the future
}
Expand Down Expand Up @@ -283,6 +298,14 @@ public void setEndKeys(List<Object> endKeys) {
this.endKeys = endKeys;
}

public List<List<Object>> getInKeys() {
return inKeys;
}

public void setInKeys(List<List<Object>> inKeys) {
this.inKeys = inKeys;
}

public String getStoragePath() {
return storagePath;
}
Expand Down
31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/PartitionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
import com.starrocks.sql.ast.DistributionDesc;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.StarRocksPlannerException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class PartitionUtils {
Expand Down Expand Up @@ -205,6 +208,34 @@ public static RangePartitionBoundary calRangePartitionBoundary(Range<PartitionKe
return new RangePartitionBoundary(isMinPartition, isMaxPartition, startKeys, endKeys);
}

public static List<List<Object>> calListPartitionKeys(List<List<LiteralExpr>> multiLiteralExprs,
List<LiteralExpr> literalExprs) {
List<List<Object>> keys = new ArrayList<>();
if (CollectionUtils.isNotEmpty(multiLiteralExprs)) {
for (List<LiteralExpr> exprs : multiLiteralExprs) {
keys.add(initItemOfInKeys(exprs));
}
}
if (CollectionUtils.isNotEmpty(literalExprs)) {
for (LiteralExpr expr : literalExprs) {
keys.add(initItemOfInKeys(Collections.singletonList(expr)));
}
}
return keys;
}

private static List<Object> initItemOfInKeys(List<LiteralExpr> exprs) {
return exprs.stream()
.filter(Objects::nonNull)
.map(PartitionUtils::exprValue)
.collect(Collectors.toList());
}

private static Object exprValue(LiteralExpr expr) {
return expr instanceof DateLiteral
? convertDateLiteralToNumber((DateLiteral) expr) : expr.getRealObjectValue();
}

// This is to be compatible with Spark Load Job formats for Date type.
// Because the historical version is serialized and deserialized with a special hash number for DateLiteral,
// special processing is also done here for DateLiteral to keep the historical version compatible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.analysis.BrokerDesc;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.FunctionCallExpr;
import com.starrocks.analysis.LiteralExpr;
Expand Down Expand Up @@ -94,7 +93,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -393,19 +391,10 @@ private List<EtlPartition> initEtlListPartition(
// bucket num
int bucketNum = partition.getDistributionInfo().getBucketNum();
// list partition values
List<List<LiteralExpr>> multiValueList = multiLiteralExprValues.get(partitionId);
List<List<Object>> inKeys = Lists.newArrayList();
if (multiValueList != null && !multiValueList.isEmpty()) {
for (List<LiteralExpr> list : multiValueList) {
inKeys.add(initItemOfInKeys(list));
}
}
List<LiteralExpr> valueList = literalExprValues.get(partitionId);
if (valueList != null && !valueList.isEmpty()) {
for (LiteralExpr literalExpr : valueList) {
inKeys.add(initItemOfInKeys(Lists.newArrayList(literalExpr)));
}
}
List<List<Object>> inKeys = PartitionUtils.calListPartitionKeys(
multiLiteralExprValues.get(partitionId),
literalExprValues.get(partitionId)
);

for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
long physicalPartitionId = physicalPartition.getId();
Expand All @@ -415,20 +404,6 @@ private List<EtlPartition> initEtlListPartition(
return etlPartitions;
}

private List<Object> initItemOfInKeys(List<LiteralExpr> list) {
List<Object> curList = new ArrayList<>();
for (LiteralExpr literalExpr : list) {
Object keyValue;
if (literalExpr instanceof DateLiteral) {
keyValue = PartitionUtils.convertDateLiteralToNumber((DateLiteral) literalExpr);
} else {
keyValue = literalExpr.getRealObjectValue();
}
curList.add(keyValue);
}
return curList;
}

private List<EtlPartition> initEtlRangePartition(
List<String> partitionColumnRefs, OlapTable table, Set<Long> partitionIds) throws LoadException {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
Expand Down
Loading

0 comments on commit ef6b979

Please sign in to comment.