Skip to content

Commit

Permalink
[Enhancement] enhance delete pruning for all kinds of partitions (Sta…
Browse files Browse the repository at this point in the history
…rRocks#55400)

Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork authored Jan 26, 2025
1 parent 1b85e9d commit 4ae63bd
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 189 deletions.
217 changes: 45 additions & 172 deletions fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,28 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.BinaryPredicate;
import com.starrocks.analysis.BinaryType;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.DecimalLiteral;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.FunctionCallExpr;
import com.starrocks.analysis.InPredicate;
import com.starrocks.analysis.IsNullPredicate;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.analysis.NullLiteral;
import com.starrocks.analysis.Predicate;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ExpressionRangePartitionInfoV2;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.MaterializedIndexMeta;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
Expand All @@ -83,7 +72,6 @@
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.ListComparator;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.concurrent.lock.LockType;
Expand All @@ -96,31 +84,32 @@
import com.starrocks.persist.metablock.SRMetaBlockID;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockWriter;
import com.starrocks.planner.PartitionColumnFilter;
import com.starrocks.planner.RangePartitionPruner;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.QueryState;
import com.starrocks.qe.QueryStateException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.service.FrontendOptions;
import com.starrocks.sql.StatementPlanner;
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.DeleteAnalyzer;
import com.starrocks.sql.ast.DeleteStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.transaction.RunningTxnExceedException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionState.TxnCoordinator;
import com.starrocks.transaction.TransactionState.TxnSourceType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.ResolverStyle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -241,28 +230,11 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
Preconditions.checkState(partitionNames != null);
boolean noPartitionSpecified = partitionNames.isEmpty();
if (noPartitionSpecified) {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.isRangePartition()) {
partitionNames = extractPartitionNamesByCondition(olapTable, conditions);
if (partitionNames.isEmpty()) {
LOG.info("The delete statement [{}] prunes all partitions",
stmt.getOrigStmt().originStmt);
return null;
}
} else if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// this is a unpartitioned table, use table name as partition name
partitionNames.add(olapTable.getName());
} else if (partitionInfo.getType() == PartitionType.LIST) {
// TODO: support list partition prune
ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo;
List<Long> partitionIds = listPartitionInfo.getPartitionIds(false);
if (partitionIds.isEmpty()) {
return null;
}
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
partitionNames.add(partition.getName());
}
partitionNames = partitionPruneForDelete(stmt, olapTable);

if (partitionNames.isEmpty()) {
LOG.info("The delete statement [{}] prunes all partitions", stmt.getOrigStmt().originStmt);
return null;
}
}

Expand Down Expand Up @@ -324,140 +296,41 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
}

@VisibleForTesting
public List<String> extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable)
throws DdlException, AnalysisException {
return extractPartitionNamesByCondition(olapTable, stmt.getDeleteConditions());
public List<String> extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable) throws DdlException {
return partitionPruneForDelete(stmt, olapTable);
}

public List<String> extractPartitionNamesByCondition(OlapTable olapTable, List<Predicate> conditions)
throws DdlException, AnalysisException {
List<String> partitionNames = Lists.newArrayList();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
Map<String, PartitionColumnFilter> columnFilters = extractColumnFilter(olapTable,
rangePartitionInfo, conditions);
Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
if (columnFilters.isEmpty()) {
partitionNames.addAll(olapTable.getPartitionNames());
} else {
RangePartitionPruner pruner = new RangePartitionPruner(keyRangeById,
rangePartitionInfo.getPartitionColumns(olapTable.getIdToColumn()), columnFilters);
Collection<Long> selectedPartitionIds = pruner.prune();

if (selectedPartitionIds == null) {
partitionNames.addAll(olapTable.getPartitionNames());
} else {
for (Long partitionId : selectedPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
partitionNames.add(partition.getName());
}
}
}
return partitionNames;
}

private Map<String, PartitionColumnFilter> extractColumnFilter(Table table, RangePartitionInfo rangePartitionInfo,
List<Predicate> conditions)
throws DdlException, AnalysisException {
Map<String, PartitionColumnFilter> columnFilters = Maps.newHashMap();
List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns(table.getIdToColumn());
List<Predicate> deleteConditions = conditions;
Map<String, Column> nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (Column column : table.getBaseSchema()) {
nameToColumn.put(column.getName(), column);
}
for (Predicate condition : deleteConditions) {
SlotRef slotRef = (SlotRef) condition.getChild(0);
String columnName = slotRef.getColumnName();

// filter condition is not partition column;
if (partitionColumns.stream().noneMatch(e -> e.getName().equals(columnName))) {
continue;
}

if (!nameToColumn.containsKey(columnName)) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName());
}
if (condition instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
LiteralExpr literalExpr = (LiteralExpr) binaryPredicate.getChild(1);
Column column = nameToColumn.get(columnName);

if (table.isExprPartitionTable() && !rangePartitionInfo.isAutomaticPartition()) {
ExpressionRangePartitionInfoV2 expressionRangePartitionInfoV2 =
(ExpressionRangePartitionInfoV2) rangePartitionInfo;
Expr partitionExpr = expressionRangePartitionInfoV2.getPartitionExprs(table.getIdToColumn()).get(0);
List<FunctionCallExpr> functionCallExprs = new ArrayList<>();
partitionExpr.collect((com.google.common.base.Predicate<Expr>) arg -> arg instanceof FunctionCallExpr,
functionCallExprs);
FunctionCallExpr functionCallExpr = functionCallExprs.get(0);
String funcName = functionCallExpr.getFnName().getFunction();
if (funcName.equalsIgnoreCase(FunctionSet.STR2DATE)) {
String format = ((StringLiteral) functionCallExpr.getChild(1)).getValue();
DateTimeFormatterBuilder builder = DateUtils.unixDatetimeFormatBuilder(format, false);
LocalDate ld = LocalDate.from(builder.toFormatter().withResolverStyle(ResolverStyle.STRICT).parse(
StringUtils.strip(literalExpr.getStringValue(), "\r\n\t ")));
literalExpr = new DateLiteral(ld.atTime(0, 0, 0), Type.DATE);
} else if (funcName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME)) {
literalExpr = LiteralExpr.create(DateUtils.formatTimestampInSeconds(
Long.parseLong(literalExpr.getStringValue())), Type.DATETIME);
} else if (funcName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME_MS)) {
literalExpr = LiteralExpr.create(DateUtils.formatTimeStampInMill(
Long.parseLong(literalExpr.getStringValue()), TimeUtils.getSystemTimeZone().toZoneId()),
Type.DATETIME);
}
} else {
literalExpr = LiteralExpr.create(literalExpr.getStringValue(),
Objects.requireNonNull(Type.fromPrimitiveType(column.getPrimitiveType())));
}

PartitionColumnFilter filter = columnFilters.getOrDefault(slotRef.getColumnName(),
new PartitionColumnFilter());
switch (binaryPredicate.getOp()) {
case EQ:
filter.setLowerBound(literalExpr, true);
filter.setUpperBound(literalExpr, true);
break;
case LE:
filter.setUpperBound(literalExpr, true);
filter.lowerBoundInclusive = true;
break;
case LT:
filter.setUpperBound(literalExpr, false);
filter.lowerBoundInclusive = true;
break;
case GE:
filter.setLowerBound(literalExpr, true);
break;
case GT:
filter.setLowerBound(literalExpr, false);
break;
default:
break;
}
columnFilters.put(slotRef.getColumnName(), filter);
} else if (condition instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) condition;
if (inPredicate.isNotIn()) {
continue;
}
List<LiteralExpr> list = Lists.newArrayList();
Column column = nameToColumn.get(columnName);
for (int i = 1; i < inPredicate.getChildren().size(); i++) {
LiteralExpr literalExpr = (LiteralExpr) inPredicate.getChild(i);
literalExpr = LiteralExpr.create(literalExpr.getStringValue(),
Objects.requireNonNull(Type.fromPrimitiveType(column.getPrimitiveType())));
list.add(literalExpr);
}

PartitionColumnFilter filter = columnFilters.getOrDefault(slotRef.getColumnName(),
new PartitionColumnFilter());
filter.setInPredicateLiterals(list);
columnFilters.put(slotRef.getColumnName(), filter);
/**
* Construct a fake sql then leverage the optimizer to prune partitions
*
* @return pruned partitions with delete conditions
*/
private List<String> partitionPruneForDelete(DeleteStmt stmt, OlapTable table) {
String tableName = stmt.getTableName().toSql();
String predicate = stmt.getWherePredicate().toSql();
String fakeSql = String.format("SELECT * FROM %s WHERE %s", tableName, predicate);
PhysicalOlapScanOperator physicalOlapScanOperator;
try {
List<StatementBase> parse = SqlParser.parse(fakeSql, ConnectContext.get().getSessionVariable());
StatementBase selectStmt = parse.get(0);
Analyzer.analyze(selectStmt, ConnectContext.get());
ExecPlan plan = StatementPlanner.plan(selectStmt, ConnectContext.get());
List<PhysicalOlapScanOperator> physicalOlapScanOperators =
Utils.extractPhysicalOlapScanOperator(plan.getPhysicalPlan());
// it's supposed to be empty set
if (CollectionUtils.isEmpty(physicalOlapScanOperators)) {
return Lists.newArrayList();
}

physicalOlapScanOperator = physicalOlapScanOperators.get(0);
} catch (Exception e) {
LOG.warn("failed to do partition pruning for delete {}", stmt.toString(), e);
return Lists.newArrayList(table.getVisiblePartitionNames());
}
return columnFilters;
List<Long> selectedPartitionId = physicalOlapScanOperator.getSelectedPartitionId();
return ListUtils.emptyIfNull(selectedPartitionId)
.stream()
.map(x -> table.getPartition(x).getName())
.collect(Collectors.toList());
}

/**
Expand Down
Loading

0 comments on commit 4ae63bd

Please sign in to comment.