From 4ae63bdce78bad9d416101174b9cb6bd82981ae3 Mon Sep 17 00:00:00 2001 From: Murphy <96611012+murphyatwork@users.noreply.github.com> Date: Sun, 26 Jan 2025 20:36:49 +0800 Subject: [PATCH] [Enhancement] enhance delete pruning for all kinds of partitions (#55400) Signed-off-by: Murphy --- .../java/com/starrocks/load/DeleteMgr.java | 217 ++++-------------- .../com/starrocks/load/DeletePruneTest.java | 97 ++++++-- 2 files changed, 125 insertions(+), 189 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java index 5fdc61167838f8..26ab21bc70b9d5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java @@ -40,7 +40,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; import com.google.gson.annotations.SerializedName; @@ -48,31 +47,21 @@ import com.starrocks.analysis.BinaryType; import com.starrocks.analysis.DateLiteral; import com.starrocks.analysis.DecimalLiteral; -import com.starrocks.analysis.Expr; -import com.starrocks.analysis.FunctionCallExpr; import com.starrocks.analysis.InPredicate; import com.starrocks.analysis.IsNullPredicate; import com.starrocks.analysis.LiteralExpr; import com.starrocks.analysis.NullLiteral; import com.starrocks.analysis.Predicate; import com.starrocks.analysis.SlotRef; -import com.starrocks.analysis.StringLiteral; import com.starrocks.catalog.Column; import com.starrocks.catalog.Database; -import com.starrocks.catalog.ExpressionRangePartitionInfoV2; -import com.starrocks.catalog.FunctionSet; import com.starrocks.catalog.KeysType; -import com.starrocks.catalog.ListPartitionInfo; import com.starrocks.catalog.MaterializedIndex; import com.starrocks.catalog.MaterializedIndexMeta; import com.starrocks.catalog.OlapTable; import com.starrocks.catalog.Partition; -import com.starrocks.catalog.PartitionInfo; -import com.starrocks.catalog.PartitionKey; -import com.starrocks.catalog.PartitionType; import com.starrocks.catalog.PhysicalPartition; import com.starrocks.catalog.PrimitiveType; -import com.starrocks.catalog.RangePartitionInfo; import com.starrocks.catalog.Table; import com.starrocks.catalog.Type; import com.starrocks.common.AnalysisException; @@ -83,7 +72,6 @@ import com.starrocks.common.FeConstants; import com.starrocks.common.Pair; import com.starrocks.common.io.Writable; -import com.starrocks.common.util.DateUtils; import com.starrocks.common.util.ListComparator; import com.starrocks.common.util.TimeUtils; import com.starrocks.common.util.concurrent.lock.LockType; @@ -96,31 +84,32 @@ import com.starrocks.persist.metablock.SRMetaBlockID; import com.starrocks.persist.metablock.SRMetaBlockReader; import com.starrocks.persist.metablock.SRMetaBlockWriter; -import com.starrocks.planner.PartitionColumnFilter; -import com.starrocks.planner.RangePartitionPruner; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.QueryState; import com.starrocks.qe.QueryStateException; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.WarehouseManager; import com.starrocks.service.FrontendOptions; +import com.starrocks.sql.StatementPlanner; +import com.starrocks.sql.analyzer.Analyzer; import com.starrocks.sql.analyzer.DeleteAnalyzer; import com.starrocks.sql.ast.DeleteStmt; +import com.starrocks.sql.ast.StatementBase; +import com.starrocks.sql.optimizer.Utils; +import com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator; +import com.starrocks.sql.parser.SqlParser; +import com.starrocks.sql.plan.ExecPlan; import com.starrocks.transaction.RunningTxnExceedException; import com.starrocks.transaction.TransactionState; import com.starrocks.transaction.TransactionState.TxnCoordinator; import com.starrocks.transaction.TransactionState.TxnSourceType; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.ListUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.time.LocalDate; -import java.time.format.DateTimeFormatterBuilder; -import java.time.format.ResolverStyle; -import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -241,28 +230,11 @@ private DeleteJob createJob(DeleteStmt stmt, List conditions, Databas Preconditions.checkState(partitionNames != null); boolean noPartitionSpecified = partitionNames.isEmpty(); if (noPartitionSpecified) { - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - if (partitionInfo.isRangePartition()) { - partitionNames = extractPartitionNamesByCondition(olapTable, conditions); - if (partitionNames.isEmpty()) { - LOG.info("The delete statement [{}] prunes all partitions", - stmt.getOrigStmt().originStmt); - return null; - } - } else if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { - // this is a unpartitioned table, use table name as partition name - partitionNames.add(olapTable.getName()); - } else if (partitionInfo.getType() == PartitionType.LIST) { - // TODO: support list partition prune - ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo; - List partitionIds = listPartitionInfo.getPartitionIds(false); - if (partitionIds.isEmpty()) { - return null; - } - for (Long partitionId : partitionIds) { - Partition partition = olapTable.getPartition(partitionId); - partitionNames.add(partition.getName()); - } + partitionNames = partitionPruneForDelete(stmt, olapTable); + + if (partitionNames.isEmpty()) { + LOG.info("The delete statement [{}] prunes all partitions", stmt.getOrigStmt().originStmt); + return null; } } @@ -324,140 +296,41 @@ private DeleteJob createJob(DeleteStmt stmt, List conditions, Databas } @VisibleForTesting - public List extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable) - throws DdlException, AnalysisException { - return extractPartitionNamesByCondition(olapTable, stmt.getDeleteConditions()); + public List extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable) throws DdlException { + return partitionPruneForDelete(stmt, olapTable); } - public List extractPartitionNamesByCondition(OlapTable olapTable, List conditions) - throws DdlException, AnalysisException { - List partitionNames = Lists.newArrayList(); - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; - Map columnFilters = extractColumnFilter(olapTable, - rangePartitionInfo, conditions); - Map> keyRangeById = rangePartitionInfo.getIdToRange(false); - if (columnFilters.isEmpty()) { - partitionNames.addAll(olapTable.getPartitionNames()); - } else { - RangePartitionPruner pruner = new RangePartitionPruner(keyRangeById, - rangePartitionInfo.getPartitionColumns(olapTable.getIdToColumn()), columnFilters); - Collection selectedPartitionIds = pruner.prune(); - - if (selectedPartitionIds == null) { - partitionNames.addAll(olapTable.getPartitionNames()); - } else { - for (Long partitionId : selectedPartitionIds) { - Partition partition = olapTable.getPartition(partitionId); - partitionNames.add(partition.getName()); - } - } - } - return partitionNames; - } - - private Map extractColumnFilter(Table table, RangePartitionInfo rangePartitionInfo, - List conditions) - throws DdlException, AnalysisException { - Map columnFilters = Maps.newHashMap(); - List partitionColumns = rangePartitionInfo.getPartitionColumns(table.getIdToColumn()); - List deleteConditions = conditions; - Map nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (Column column : table.getBaseSchema()) { - nameToColumn.put(column.getName(), column); - } - for (Predicate condition : deleteConditions) { - SlotRef slotRef = (SlotRef) condition.getChild(0); - String columnName = slotRef.getColumnName(); - - // filter condition is not partition column; - if (partitionColumns.stream().noneMatch(e -> e.getName().equals(columnName))) { - continue; - } - - if (!nameToColumn.containsKey(columnName)) { - ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName()); - } - if (condition instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) condition; - LiteralExpr literalExpr = (LiteralExpr) binaryPredicate.getChild(1); - Column column = nameToColumn.get(columnName); - - if (table.isExprPartitionTable() && !rangePartitionInfo.isAutomaticPartition()) { - ExpressionRangePartitionInfoV2 expressionRangePartitionInfoV2 = - (ExpressionRangePartitionInfoV2) rangePartitionInfo; - Expr partitionExpr = expressionRangePartitionInfoV2.getPartitionExprs(table.getIdToColumn()).get(0); - List functionCallExprs = new ArrayList<>(); - partitionExpr.collect((com.google.common.base.Predicate) arg -> arg instanceof FunctionCallExpr, - functionCallExprs); - FunctionCallExpr functionCallExpr = functionCallExprs.get(0); - String funcName = functionCallExpr.getFnName().getFunction(); - if (funcName.equalsIgnoreCase(FunctionSet.STR2DATE)) { - String format = ((StringLiteral) functionCallExpr.getChild(1)).getValue(); - DateTimeFormatterBuilder builder = DateUtils.unixDatetimeFormatBuilder(format, false); - LocalDate ld = LocalDate.from(builder.toFormatter().withResolverStyle(ResolverStyle.STRICT).parse( - StringUtils.strip(literalExpr.getStringValue(), "\r\n\t "))); - literalExpr = new DateLiteral(ld.atTime(0, 0, 0), Type.DATE); - } else if (funcName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME)) { - literalExpr = LiteralExpr.create(DateUtils.formatTimestampInSeconds( - Long.parseLong(literalExpr.getStringValue())), Type.DATETIME); - } else if (funcName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME_MS)) { - literalExpr = LiteralExpr.create(DateUtils.formatTimeStampInMill( - Long.parseLong(literalExpr.getStringValue()), TimeUtils.getSystemTimeZone().toZoneId()), - Type.DATETIME); - } - } else { - literalExpr = LiteralExpr.create(literalExpr.getStringValue(), - Objects.requireNonNull(Type.fromPrimitiveType(column.getPrimitiveType()))); - } - - PartitionColumnFilter filter = columnFilters.getOrDefault(slotRef.getColumnName(), - new PartitionColumnFilter()); - switch (binaryPredicate.getOp()) { - case EQ: - filter.setLowerBound(literalExpr, true); - filter.setUpperBound(literalExpr, true); - break; - case LE: - filter.setUpperBound(literalExpr, true); - filter.lowerBoundInclusive = true; - break; - case LT: - filter.setUpperBound(literalExpr, false); - filter.lowerBoundInclusive = true; - break; - case GE: - filter.setLowerBound(literalExpr, true); - break; - case GT: - filter.setLowerBound(literalExpr, false); - break; - default: - break; - } - columnFilters.put(slotRef.getColumnName(), filter); - } else if (condition instanceof InPredicate) { - InPredicate inPredicate = (InPredicate) condition; - if (inPredicate.isNotIn()) { - continue; - } - List list = Lists.newArrayList(); - Column column = nameToColumn.get(columnName); - for (int i = 1; i < inPredicate.getChildren().size(); i++) { - LiteralExpr literalExpr = (LiteralExpr) inPredicate.getChild(i); - literalExpr = LiteralExpr.create(literalExpr.getStringValue(), - Objects.requireNonNull(Type.fromPrimitiveType(column.getPrimitiveType()))); - list.add(literalExpr); - } - - PartitionColumnFilter filter = columnFilters.getOrDefault(slotRef.getColumnName(), - new PartitionColumnFilter()); - filter.setInPredicateLiterals(list); - columnFilters.put(slotRef.getColumnName(), filter); + /** + * Construct a fake sql then leverage the optimizer to prune partitions + * + * @return pruned partitions with delete conditions + */ + private List partitionPruneForDelete(DeleteStmt stmt, OlapTable table) { + String tableName = stmt.getTableName().toSql(); + String predicate = stmt.getWherePredicate().toSql(); + String fakeSql = String.format("SELECT * FROM %s WHERE %s", tableName, predicate); + PhysicalOlapScanOperator physicalOlapScanOperator; + try { + List parse = SqlParser.parse(fakeSql, ConnectContext.get().getSessionVariable()); + StatementBase selectStmt = parse.get(0); + Analyzer.analyze(selectStmt, ConnectContext.get()); + ExecPlan plan = StatementPlanner.plan(selectStmt, ConnectContext.get()); + List physicalOlapScanOperators = + Utils.extractPhysicalOlapScanOperator(plan.getPhysicalPlan()); + // it's supposed to be empty set + if (CollectionUtils.isEmpty(physicalOlapScanOperators)) { + return Lists.newArrayList(); } - + physicalOlapScanOperator = physicalOlapScanOperators.get(0); + } catch (Exception e) { + LOG.warn("failed to do partition pruning for delete {}", stmt.toString(), e); + return Lists.newArrayList(table.getVisiblePartitionNames()); } - return columnFilters; + List selectedPartitionId = physicalOlapScanOperator.getSelectedPartitionId(); + return ListUtils.emptyIfNull(selectedPartitionId) + .stream() + .map(x -> table.getPartition(x).getName()) + .collect(Collectors.toList()); } /** diff --git a/fe/fe-core/src/test/java/com/starrocks/load/DeletePruneTest.java b/fe/fe-core/src/test/java/com/starrocks/load/DeletePruneTest.java index bbbbe3233fed25..1add5769ec1aa3 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/DeletePruneTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/DeletePruneTest.java @@ -14,8 +14,12 @@ package com.starrocks.load; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.starrocks.analysis.TableName; import com.starrocks.catalog.Database; import com.starrocks.catalog.OlapTable; +import com.starrocks.common.FeConstants; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.ast.DeleteStmt; @@ -67,23 +71,39 @@ public static void beforeClass() throws Exception { "\"replication_num\" = \"1\",\n" + "\"in_memory\" = \"false\"\n" + ");") - .withTable("CREATE TABLE `test_delete2` (\n" + - " `date` date NULL COMMENT \"\",\n" + - " `id` int(11) NULL COMMENT \"\",\n" + - " `value` char(20) NULL COMMENT \"\"\n" + - ") ENGINE=OLAP \n" + - "DUPLICATE KEY(`date`, `id`)\n" + - "COMMENT \"OLAP\"\n" + - "PARTITION BY RANGE(`date`, `id`)\n" + - "(\n" + - " PARTITION `p202001_1000` VALUES LESS THAN (\"2020-02-01\", \"1000\"),\n" + - " PARTITION `p202002_2000` VALUES LESS THAN (\"2020-03-01\", \"2000\"),\n" + - " PARTITION `p202003_all` VALUES LESS THAN (\"2020-04-01\")\n" + - ")\n" + - "DISTRIBUTED BY HASH(`date`, `id`) BUCKETS 3 \n" + - "PROPERTIES (\n" + - "\"replication_num\" = \"1\"\n" + - ");"); + .withTable("CREATE TABLE `test_delete2` (\n" + + " `date` date NULL COMMENT \"\",\n" + + " `id` int(11) NULL COMMENT \"\",\n" + + " `value` char(20) NULL COMMENT \"\"\n" + + ") ENGINE=OLAP \n" + + "DUPLICATE KEY(`date`, `id`)\n" + + "COMMENT \"OLAP\"\n" + + "PARTITION BY RANGE(`date`, `id`)\n" + + "(\n" + + " PARTITION `p202001_1000` VALUES LESS THAN (\"2020-02-01\", \"1000\"),\n" + + " PARTITION `p202002_2000` VALUES LESS THAN (\"2020-03-01\", \"2000\"),\n" + + " PARTITION `p202003_all` VALUES LESS THAN (\"2020-04-01\")\n" + + ")\n" + + "DISTRIBUTED BY HASH(`date`, `id`) BUCKETS 3 \n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");") + .withTable("CREATE TABLE `test_delete3` (" + + " `date` date NULL," + + " c1 int NULL" + + ") PARTITION BY (`date`)" + + " PROPERTIES ('replication_num'='1') ") + .withTable("CREATE TABLE `test_delete4` (" + + " `date` date NULL," + + " c1 int NULL) " + + " PROPERTIES ('replication_num'='1') "); + UtFrameUtils.mockDML(); + starRocksAssert.getCtx().executeSql("insert into test_delete3 values('2020-01-01', 1), ('2020-01-02', 2)"); + starRocksAssert.getCtx() + .executeSql("alter table test_delete3 add partition p20200101 values in ('2020-01-01')"); + starRocksAssert.getCtx() + .executeSql("alter table test_delete3 add partition p20200102 values in ('2020-01-02')"); + FeConstants.runningUnitTest = true; } @Test @@ -183,4 +203,47 @@ public void testDeletePruneMultiPartition() throws Exception { Assert.assertEquals(3, res.size()); } + @Test + public void testDeletePruneListPartition() throws Exception { + ConnectContext ctx = starRocksAssert.getCtx(); + OlapTable tbl = (OlapTable) starRocksAssert.getTable("test", "test_delete3"); + Assert.assertEquals(Sets.newHashSet("p20200101", "p20200102"), tbl.getVisiblePartitionNames()); + + // delete one partition + String deleteSQL = "delete from test_delete3 where date in ('2020-01-01') "; + DeleteStmt deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx); + List res = deleteHandler.extractPartitionNamesByCondition(deleteStmt, tbl); + Assert.assertEquals(1, res.size()); + Assert.assertEquals(res.get(0), "p20200101"); + + // delete two partitions + deleteSQL = "delete from test_delete3 where date in ('2020-01-01', '2020-01-02') "; + deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx); + res = deleteHandler.extractPartitionNamesByCondition(deleteStmt, tbl); + Assert.assertEquals(Lists.newArrayList("p20200101", "p20200102"), res); + + // exceptional + deleteSQL = "delete from test_delete3 where date in ('2020-01-01') "; + deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx); + DeleteStmt exceptionStmt = new DeleteStmt(TableName.fromString("not_exists"), deleteStmt.getPartitionNames(), + deleteStmt.getWherePredicate()); + exceptionStmt.setDeleteConditions(deleteStmt.getDeleteConditions()); + res = deleteHandler.extractPartitionNamesByCondition(exceptionStmt, tbl); + Assert.assertEquals(Lists.newArrayList("p20200102", "p20200101"), res); + } + + @Test + public void testDeleteUnPartitionTable() throws Exception { + ConnectContext ctx = starRocksAssert.getCtx(); + OlapTable tbl = (OlapTable) starRocksAssert.getTable("test", "test_delete4"); + Assert.assertEquals(Sets.newHashSet("test_delete4"), tbl.getVisiblePartitionNames()); + + // delete one partition + String deleteSQL = "delete from test_delete4 where date in ('2020-01-01') "; + DeleteStmt deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx); + List res = deleteHandler.extractPartitionNamesByCondition(deleteStmt, tbl); + Assert.assertEquals(1, res.size()); + Assert.assertEquals(res.get(0), "test_delete4"); + } + }