From 7d5235bd61c687edaba5b5ae7f3b6b782ef07236 Mon Sep 17 00:00:00 2001 From: morrySnow Date: Fri, 11 Aug 2023 20:26:26 +0800 Subject: [PATCH] [fix](Nereids) type check could not work when root node is table or file sink --- .../analyzer/UnboundOlapTableSink.java | 3 +- .../nereids/analyzer/UnboundResultSink.java | 5 +- .../doris/nereids/jobs/executor/Analyzer.java | 4 +- .../nereids/parser/LogicalPlanBuilder.java | 2 +- .../apache/doris/nereids/rules/RuleType.java | 1 + .../rules/analysis/BindInsertTargetTable.java | 209 ---------------- .../nereids/rules/analysis/BindSink.java | 227 ++++++++++++++++++ .../LogicalDeferMaterializeResultSink.java | 9 +- .../trees/plans/logical/LogicalFileSink.java | 43 ++-- .../plans/logical/LogicalOlapTableSink.java | 57 +++-- .../plans/logical/LogicalResultSink.java | 45 +--- .../trees/plans/logical/LogicalSink.java | 51 +++- 12 files changed, 337 insertions(+), 319 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java index 484ab16b509427..b7306fd44eef75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Objects; @@ -63,7 +64,7 @@ public UnboundOlapTableSink(List nameParts, List colNames, List< public UnboundOlapTableSink(List nameParts, List colNames, List hints, List partitions, boolean isPartialUpdate, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); this.nameParts = Utils.copyRequiredList(nameParts); this.colNames = Utils.copyRequiredList(colNames); this.hints = Utils.copyRequiredList(hints); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java index 9743d0fe8f5eb4..3c1519d6f80c97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundResultSink.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Optional; @@ -40,12 +41,12 @@ public class UnboundResultSink extends LogicalSink implements Unbound, Sink { public UnboundResultSink(CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, child); + super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), child); } public UnboundResultSink(Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_UNBOUND_RESULT_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java index 96afa6b24ad0b5..a620d6b5b13b6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Analyzer.java @@ -22,9 +22,9 @@ import org.apache.doris.nereids.rules.analysis.AdjustAggregateNullableForEmptySet; import org.apache.doris.nereids.rules.analysis.AnalyzeCTE; import org.apache.doris.nereids.rules.analysis.BindExpression; -import org.apache.doris.nereids.rules.analysis.BindInsertTargetTable; import org.apache.doris.nereids.rules.analysis.BindRelation; import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver; +import org.apache.doris.nereids.rules.analysis.BindSink; import org.apache.doris.nereids.rules.analysis.CheckAnalysis; import org.apache.doris.nereids.rules.analysis.CheckBound; import org.apache.doris.nereids.rules.analysis.CheckPolicy; @@ -86,7 +86,7 @@ private static List buildAnalyzeJobs(Optional c new UserAuthentication(), new BindExpression() ), - topDown(new BindInsertTargetTable()), + topDown(new BindSink()), bottomUp(new CheckBound()), bottomUp( new ProjectToGlobalAggregate(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a23825a8427400..c73ca04a2f249f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1496,7 +1496,7 @@ private LogicalPlan withOutFile(LogicalPlan plan, OutFileClauseContext ctx) { properties.put(key, value); } Literal filePath = (Literal) visit(ctx.filePath); - return new LogicalFileSink<>(filePath.getStringValue(), format, properties, plan); + return new LogicalFileSink<>(filePath.getStringValue(), format, properties, ImmutableList.of(), plan); } private LogicalPlan withQueryOrganization(LogicalPlan inputPlan, QueryOrganizationContext ctx) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index fbefe06456102c..fb3569916419a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -57,6 +57,7 @@ public enum RuleType { BINDING_SET_OPERATION_SLOT(RuleTypeClass.REWRITE), BINDING_GENERATE_FUNCTION(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_FILE(RuleTypeClass.REWRITE), REPLACE_SORT_EXPRESSION_BY_CHILD_OUTPUT(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java deleted file mode 100644 index 750413b919ca40..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java +++ /dev/null @@ -1,209 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.rules.analysis; - -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.Pair; -import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; -import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.parser.NereidsParser; -import org.apache.doris.nereids.rules.Rule; -import org.apache.doris.nereids.rules.RuleType; -import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.literal.Literal; -import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; -import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.logical.LogicalProject; -import org.apache.doris.nereids.types.DataType; -import org.apache.doris.nereids.util.RelationUtil; -import org.apache.doris.nereids.util.TypeCoercionUtils; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * bind an unbound logicalOlapTableSink represent the target table of an insert command - */ -public class BindInsertTargetTable extends OneAnalysisRuleFactory { - @Override - public Rule build() { - return unboundOlapTableSink() - .thenApply(ctx -> { - UnboundOlapTableSink sink = ctx.root; - Pair pair = bind(ctx.cascadesContext, sink); - Database database = pair.first; - OlapTable table = pair.second; - - LogicalPlan child = ((LogicalPlan) sink.child()); - - LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( - database, - table, - bindTargetColumns(table, sink.getColNames()), - bindPartitionIds(table, sink.getPartitions()), - sink.isPartialUpdate(), - sink.child()); - - // we need to insert all the columns of the target table although some columns are not mentions. - // so we add a projects to supply the default value. - - if (boundSink.getCols().size() != child.getOutput().size()) { - throw new AnalysisException("insert into cols should be corresponding to the query output"); - } - - Map columnToChildOutput = Maps.newHashMap(); - for (int i = 0; i < boundSink.getCols().size(); ++i) { - columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); - } - - Map columnToOutput = Maps.newLinkedHashMap(); - NereidsParser expressionParser = new NereidsParser(); - - // generate slots not mentioned in sql, mv slots and shaded slots. - for (Column column : boundSink.getTargetTable().getFullSchema()) { - if (column.isMaterializedViewColumn()) { - List refs = column.getRefColumns(); - // now we have to replace the column to slots. - Preconditions.checkArgument(refs != null, - "mv column's ref column cannot be null"); - Expression parsedExpression = expressionParser.parseExpression( - column.getDefineExpr().toSql()); - Expression boundExpression = SlotReplacer.INSTANCE - .replace(parsedExpression, columnToOutput); - - NamedExpression slot = boundExpression instanceof NamedExpression - ? ((NamedExpression) boundExpression) - : new Alias(boundExpression, boundExpression.toSql()); - - columnToOutput.put(column.getName(), slot); - } else if (columnToChildOutput.containsKey(column)) { - columnToOutput.put(column.getName(), columnToChildOutput.get(column)); - } else { - if (table.hasSequenceCol() - && column.getName().equals(Column.SEQUENCE_COL) - && table.getSequenceMapCol() != null) { - Column seqCol = table.getFullSchema().stream() - .filter(col -> col.getName().equals(table.getSequenceMapCol())) - .findFirst().get(); - columnToOutput.put(column.getName(), columnToOutput.get(seqCol.getName())); - } else if (column.getDefaultValue() == null) { - columnToOutput.put(column.getName(), new Alias( - new NullLiteral(DataType.fromCatalogType(column.getType())), - column.getName() - )); - } else { - columnToOutput.put(column.getName(), new Alias(Literal.of(column.getDefaultValue()) - .checkedCastTo(DataType.fromCatalogType(column.getType())), column.getName())); - } - } - } - List fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); - - LogicalProject fullOutputProject = new LogicalProject<>(fullOutputExprs, boundSink.child()); - - // add cast project - List castExprs = Lists.newArrayList(); - for (int i = 0; i < table.getFullSchema().size(); ++i) { - Expression castExpr = TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i), - DataType.fromCatalogType(table.getFullSchema().get(i).getType())); - if (castExpr instanceof NamedExpression) { - castExprs.add(((NamedExpression) castExpr)); - } else { - castExprs.add(new Alias(castExpr, castExpr.toSql())); - } - } - if (!castExprs.equals(fullOutputExprs)) { - fullOutputProject = new LogicalProject(castExprs, fullOutputProject); - } - - return boundSink.withChildren(fullOutputProject); - - }).toRule(RuleType.BINDING_INSERT_TARGET_TABLE); - } - - private Pair bind(CascadesContext cascadesContext, UnboundOlapTableSink sink) { - List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), - sink.getNameParts()); - Pair pair = RelationUtil.getDbAndTable(tableQualifier, - cascadesContext.getConnectContext().getEnv()); - if (!(pair.second instanceof OlapTable)) { - throw new AnalysisException("the target table of insert into is not an OLAP table"); - } - return Pair.of(((Database) pair.first), (OlapTable) pair.second); - } - - private List bindPartitionIds(OlapTable table, List partitions) { - return partitions.isEmpty() - ? ImmutableList.of() - : partitions.stream().map(pn -> { - Partition partition = table.getPartition(pn); - if (partition == null) { - throw new AnalysisException(String.format("partition %s is not found in table %s", - pn, table.getName())); - } - return partition.getId(); - }).collect(Collectors.toList()); - } - - private List bindTargetColumns(OlapTable table, List colsName) { - return colsName.isEmpty() - ? table.getFullSchema().stream().filter(column -> column.isVisible() - && !column.isMaterializedViewColumn()) - .collect(Collectors.toList()) - : colsName.stream().map(cn -> { - Column column = table.getColumn(cn); - if (column == null) { - throw new AnalysisException(String.format("column %s is not found in table %s", - cn, table.getName())); - } - return column; - }).collect(Collectors.toList()); - } - - private static class SlotReplacer extends DefaultExpressionRewriter> { - public static final SlotReplacer INSTANCE = new SlotReplacer(); - - public Expression replace(Expression e, Map replaceMap) { - return e.accept(this, replaceMap); - } - - @Override - public Expression visitUnboundSlot(UnboundSlot unboundSlot, Map replaceMap) { - return replaceMap.get(unboundSlot.getName()); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java new file mode 100644 index 00000000000000..69896e44ee2e92 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.analysis; + +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.nereids.util.TypeCoercionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * bind an unbound logicalOlapTableSink represent the target table of an insert command + */ +public class BindSink implements AnalysisRuleFactory { + + @Override + public List buildRules() { + return ImmutableList.of( + RuleType.BINDING_INSERT_TARGET_TABLE.build( + unboundOlapTableSink().thenApply(ctx -> { + UnboundOlapTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + Database database = pair.first; + OlapTable table = pair.second; + + LogicalPlan child = ((LogicalPlan) sink.child()); + + LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( + database, + table, + bindTargetColumns(table, sink.getColNames()), + bindPartitionIds(table, sink.getPartitions()), + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.isPartialUpdate(), + sink.child()); + + // we need to insert all the columns of the target table + // although some columns are not mentions. + // so we add a projects to supply the default value. + + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException( + "insert into cols should be corresponding to the query output"); + } + + Map columnToChildOutput = Maps.newHashMap(); + for (int i = 0; i < boundSink.getCols().size(); ++i) { + columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); + } + + Map columnToOutput = Maps.newLinkedHashMap(); + NereidsParser expressionParser = new NereidsParser(); + + // generate slots not mentioned in sql, mv slots and shaded slots. + for (Column column : boundSink.getTargetTable().getFullSchema()) { + if (column.isMaterializedViewColumn()) { + List refs = column.getRefColumns(); + // now we have to replace the column to slots. + Preconditions.checkArgument(refs != null, + "mv column's ref column cannot be null"); + Expression parsedExpression = expressionParser.parseExpression( + column.getDefineExpr().toSql()); + Expression boundExpression = SlotReplacer.INSTANCE + .replace(parsedExpression, columnToOutput); + + NamedExpression slot = boundExpression instanceof NamedExpression + ? ((NamedExpression) boundExpression) + : new Alias(boundExpression, boundExpression.toSql()); + + columnToOutput.put(column.getName(), slot); + } else if (columnToChildOutput.containsKey(column)) { + columnToOutput.put(column.getName(), columnToChildOutput.get(column)); + } else { + if (table.hasSequenceCol() + && column.getName().equals(Column.SEQUENCE_COL) + && table.getSequenceMapCol() != null) { + Column seqCol = table.getFullSchema().stream() + .filter(col -> col.getName().equals(table.getSequenceMapCol())) + .findFirst().get(); + columnToOutput.put(column.getName(), columnToOutput.get(seqCol.getName())); + } else if (column.getDefaultValue() == null) { + columnToOutput.put(column.getName(), new Alias( + new NullLiteral(DataType.fromCatalogType(column.getType())), + column.getName() + )); + } else { + columnToOutput.put(column.getName(), + new Alias(Literal.of(column.getDefaultValue()) + .checkedCastTo(DataType.fromCatalogType(column.getType())), + column.getName())); + } + } + } + List fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); + + LogicalProject fullOutputProject = new LogicalProject<>(fullOutputExprs, + boundSink.child()); + + // add cast project + List castExprs = Lists.newArrayList(); + for (int i = 0; i < table.getFullSchema().size(); ++i) { + Expression castExpr = TypeCoercionUtils.castIfNotSameType(fullOutputExprs.get(i), + DataType.fromCatalogType(table.getFullSchema().get(i).getType())); + if (castExpr instanceof NamedExpression) { + castExprs.add(((NamedExpression) castExpr)); + } else { + castExprs.add(new Alias(castExpr, castExpr.toSql())); + } + } + if (!castExprs.equals(fullOutputExprs)) { + fullOutputProject = new LogicalProject(castExprs, fullOutputProject); + } + + return boundSink.withChildAndUpdateOutput(fullOutputProject); + + })), + RuleType.BINDING_INSERT_FILE.build( + logicalFileSink().when(s -> s.getOutputExprs().isEmpty()) + .then(fileSink -> fileSink.withOutputExprs( + fileSink.child().getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()))) + ) + ); + } + + private Pair bind(CascadesContext cascadesContext, UnboundOlapTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (!(pair.second instanceof OlapTable)) { + throw new AnalysisException("the target table of insert into is not an OLAP table"); + } + return Pair.of(((Database) pair.first), (OlapTable) pair.second); + } + + private List bindPartitionIds(OlapTable table, List partitions) { + return partitions.isEmpty() + ? ImmutableList.of() + : partitions.stream().map(pn -> { + Partition partition = table.getPartition(pn); + if (partition == null) { + throw new AnalysisException(String.format("partition %s is not found in table %s", + pn, table.getName())); + } + return partition.getId(); + }).collect(Collectors.toList()); + } + + private List bindTargetColumns(OlapTable table, List colsName) { + return colsName.isEmpty() + ? table.getFullSchema().stream().filter(column -> column.isVisible() + && !column.isMaterializedViewColumn()) + .collect(Collectors.toList()) + : colsName.stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(Collectors.toList()); + } + + private static class SlotReplacer extends DefaultExpressionRewriter> { + public static final SlotReplacer INSTANCE = new SlotReplacer(); + + public Expression replace(Expression e, Map replaceMap) { + return e.accept(this, replaceMap); + } + + @Override + public Expression visitUnboundSlot(UnboundSlot unboundSlot, Map replaceMap) { + return replaceMap.get(unboundSlot.getName()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java index 5dfbd4eba9554c..277d0dc903ad7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java @@ -21,7 +21,6 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -54,7 +53,8 @@ public LogicalDeferMaterializeResultSink(LogicalResultSink logic OlapTable olapTable, long selectedIndexId, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(logicalResultSink.getType(), groupExpression, logicalProperties, child); + super(logicalResultSink.getType(), logicalResultSink.getOutputExprs(), + groupExpression, logicalProperties, child); this.logicalResultSink = logicalResultSink; this.olapTable = olapTable; this.selectedIndexId = selectedIndexId; @@ -109,11 +109,6 @@ public LogicalDeferMaterializeResultSink withGroupExprLogicalPropChildren( olapTable, selectedIndexId, groupExpression, logicalProperties, children.get(0)); } - @Override - public List computeOutput() { - return child().getOutput(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java index 89918d231744d1..0dda497d285ac7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileSink.java @@ -19,15 +19,13 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.List; @@ -44,23 +42,29 @@ public class LogicalFileSink extends LogicalSink properties; - public LogicalFileSink(String filePath, String format, Map properties, CHILD_TYPE child) { - this(filePath, format, properties, Optional.empty(), Optional.empty(), child); + public LogicalFileSink(String filePath, String format, + Map properties, List outputExprs, CHILD_TYPE child) { + this(filePath, format, properties, outputExprs, Optional.empty(), Optional.empty(), child); } public LogicalFileSink(String filePath, String format, Map properties, + List outputExprs, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_FILE_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_FILE_SINK, outputExprs, groupExpression, logicalProperties, child); this.filePath = Objects.requireNonNull(filePath); this.format = Objects.requireNonNull(format); this.properties = ImmutableMap.copyOf(Objects.requireNonNull(properties)); } + public Plan withOutputExprs(List outputExprs) { + return new LogicalFileSink<>(filePath, format, properties, outputExprs, child()); + } + @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalFileSink<>(filePath, format, properties, children.get(0)); + return new LogicalFileSink<>(filePath, format, properties, outputExprs, children.get(0)); } @Override @@ -68,11 +72,6 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitLogicalFileSink(this, context); } - @Override - public List getExpressions() { - return ImmutableList.of(); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -81,33 +80,31 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } + if (!super.equals(o)) { + return false; + } LogicalFileSink that = (LogicalFileSink) o; - return Objects.equals(filePath, that.filePath) - && Objects.equals(format, that.format) + return Objects.equals(filePath, that.filePath) && Objects.equals(format, that.format) && Objects.equals(properties, that.properties); } @Override public int hashCode() { - return Objects.hash(filePath, format, properties); + return Objects.hash(super.hashCode(), filePath, format, properties); } @Override public Plan withGroupExpression(Optional groupExpression) { - return new LogicalFileSink<>(filePath, format, properties, groupExpression, - Optional.of(getLogicalProperties()), child()); + return new LogicalFileSink<>(filePath, format, properties, outputExprs, + groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalFileSink<>(filePath, format, properties, groupExpression, logicalProperties, children.get(0)); - } - - @Override - public List computeOutput() { - return child().getOutput(); + return new LogicalFileSink<>(filePath, format, properties, outputExprs, + groupExpression, logicalProperties, children.get(0)); } public String getFilePath() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index 631b6b6ab3e7ba..3f59872d36bf29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -22,8 +22,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; @@ -49,18 +48,19 @@ public class LogicalOlapTableSink extends LogicalSink cols, List partitionIds, - boolean isPartialUpdate, CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, isPartialUpdate, Optional.empty(), Optional.empty(), child); + List outputExprs, boolean isPartialUpdate, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + Optional.empty(), Optional.empty(), child); } /** * constructor */ public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, - List partitionIds, boolean isPartialUpdate, Optional groupExpression, - Optional logicalProperties, + List partitionIds, List outputExprs, boolean isPartialUpdate, + Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.cols = Utils.copyRequiredList(cols); @@ -68,10 +68,18 @@ public LogicalOlapTableSink(Database database, OlapTable targetTable, List output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, isPartialUpdate, + Optional.empty(), Optional.empty(), child); + } + @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, Optional.empty(), Optional.empty(), children.get(0)); } @@ -103,17 +111,18 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - LogicalOlapTableSink sink = (LogicalOlapTableSink) o; - return isPartialUpdate == sink.isPartialUpdate() - && Objects.equals(database, sink.database) - && Objects.equals(targetTable, sink.targetTable) - && Objects.equals(partitionIds, sink.partitionIds) - && Objects.equals(cols, sink.cols); + if (!super.equals(o)) { + return false; + } + LogicalOlapTableSink that = (LogicalOlapTableSink) o; + return isPartialUpdate == that.isPartialUpdate && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) + && Objects.equals(partitionIds, that.partitionIds); } @Override public int hashCode() { - return Objects.hash(database, targetTable, partitionIds, cols, isPartialUpdate); + return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, isPartialUpdate); } @Override @@ -121,26 +130,16 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitLogicalOlapTableSink(this, context); } - @Override - public List getExpressions() { - return ImmutableList.of(); - } - @Override public Plan withGroupExpression(Optional groupExpression) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, - Optional.of(getLogicalProperties()), child()); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, - logicalProperties, children.get(0)); - } - - @Override - public List computeOutput() { - return child().getOutput(); + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, + groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java index cc2eb0cf377446..6e754b491a94fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java @@ -19,9 +19,7 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; @@ -29,10 +27,8 @@ import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.Objects; import java.util.Optional; /** @@ -40,22 +36,14 @@ */ public class LogicalResultSink extends LogicalSink implements Sink { - private final List outputExprs; - public LogicalResultSink(List outputExprs, CHILD_TYPE child) { - super(PlanType.LOGICAL_RESULT_SINK, child); - this.outputExprs = outputExprs; + super(PlanType.LOGICAL_RESULT_SINK, outputExprs, child); } public LogicalResultSink(List outputExprs, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_RESULT_SINK, groupExpression, logicalProperties, child); - this.outputExprs = outputExprs; - } - - public List getOutputExprs() { - return outputExprs; + super(PlanType.LOGICAL_RESULT_SINK, outputExprs, groupExpression, logicalProperties, child); } @Override @@ -70,11 +58,6 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitLogicalResultSink(this, context); } - @Override - public List getExpressions() { - return outputExprs; - } - @Override public LogicalResultSink withGroupExpression(Optional groupExpression) { return new LogicalResultSink<>(outputExprs, groupExpression, Optional.of(getLogicalProperties()), child()); @@ -87,33 +70,9 @@ public LogicalResultSink withGroupExprLogicalPropChildren(Optional(outputExprs, groupExpression, logicalProperties, children.get(0)); } - @Override - public List computeOutput() { - return outputExprs.stream() - .map(NamedExpression::toSlot) - .collect(ImmutableList.toImmutableList()); - } - @Override public String toString() { return Utils.toSqlString("LogicalResultSink[" + id.asInt() + "]", "outputExprs", outputExprs); } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LogicalResultSink that = (LogicalResultSink) o; - return Objects.equals(outputExprs, that.outputExprs); - } - - @Override - public int hashCode() { - return Objects.hash(outputExprs); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java index fd98c29a05ad6d..3d10c639b324fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSink.java @@ -19,21 +19,68 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; import java.util.Optional; /** abstract logical sink */ public abstract class LogicalSink extends LogicalUnary { - public LogicalSink(PlanType type, CHILD_TYPE child) { + protected final List outputExprs; + + public LogicalSink(PlanType type, List outputExprs, CHILD_TYPE child) { super(type, child); + this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null")); } - public LogicalSink(PlanType type, + public LogicalSink(PlanType type, List outputExprs, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(type, groupExpression, logicalProperties, child); + this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null")); + } + + public List getOutputExprs() { + return outputExprs; + } + + @Override + public List getExpressions() { + return outputExprs; + } + + @Override + public List computeOutput() { + return outputExprs.stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalSink that = (LogicalSink) o; + return Objects.equals(outputExprs, that.outputExprs); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), outputExprs); } }