diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java index 342cafd2526a8..b6e3f1eaf8385 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java @@ -20,10 +20,10 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank; import org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule; import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil; @@ -57,8 +57,8 @@ * a table source or a view only if it contains the unique key and time attribute. * *
Flink supports extract the primary key and row time attribute from the view if the view comes - * from {@link StreamPhysicalRank} node which can convert to a {@link StreamPhysicalDeduplicate} - * node. + * from {@link StreamPhysicalRank} node which can convert to a {@link StreamExecDeduplicate} node + * finally. */ @Value.Enclosing public class TemporalJoinRewriteWithUniqueKeyRule diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala deleted file mode 100644 index dbc5e60043d29..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.nodes.physical.stream - -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate -import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils -import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} - -import java.util - -/** - * TODO to be removed after FLINK-34702 is fixed. Stream physical RelNode which deduplicate on keys - * and keeps only first row or last row. This node is an optimization of [[StreamPhysicalRank]] for - * some special cases. Compared to [[StreamPhysicalRank]], this node could use mini-batch and access - * less state. - */ -class StreamPhysicalDeduplicate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - uniqueKeys: Array[Int], - val isRowtime: Boolean, - val keepLastRow: Boolean) - extends SingleRel(cluster, traitSet, inputRel) - with StreamPhysicalRel { - - def getUniqueKeys: Array[Int] = uniqueKeys - - override def requireWatermark: Boolean = isRowtime - - override def deriveRowType(): RelDataType = getInput.getRowType - - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new StreamPhysicalDeduplicate( - cluster, - traitSet, - inputs.get(0), - uniqueKeys, - isRowtime, - keepLastRow) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - val fieldNames = getRowType.getFieldNames - val orderString = if (isRowtime) "ROWTIME" else "PROCTIME" - val keep = if (keepLastRow) "LastRow" else "FirstRow" - super - .explainTerms(pw) - .item("keep", keep) - .item("key", uniqueKeys.map(fieldNames.get).mkString(", ")) - .item("order", orderString) - } - - override def translateToExecNode(): ExecNode[_] = { - val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) - new StreamExecDeduplicate( - unwrapTableConfig(this), - uniqueKeys, - isRowtime, - keepLastRow, - generateUpdateBefore, - InputProperty.DEFAULT, - FlinkTypeFactory.toLogicalRowType(getRowType), - getRelDetailedDescription) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala index 0c878a8fdcc73..63fbee077b487 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank -import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank} +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, RankUtil} import org.apache.calcite.plan.RelOptRule @@ -28,10 +28,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule.Config -/** - * Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]]. NOTES: the rank can - * not be converted to [[StreamPhysicalDeduplicate]]. - */ +/** Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]]. */ class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) { override def convert(rel: RelNode): RelNode = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala index 8e6ceaa3cc88f..748a364f92161 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.ExpressionReducer import org.apache.flink.table.planner.plan.nodes.calcite.Rank import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank -import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank} +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalRank, StreamPhysicalWindowDeduplicate} import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange} import org.apache.calcite.plan.RelOptUtil @@ -327,7 +327,7 @@ object RankUtil { } /** - * Whether the given rank could be converted to [[StreamPhysicalDeduplicate]]. + * Whether the given rank could be converted to [[StreamPhysicalWindowDeduplicate]]. * * Returns true if the given rank is sorted by time attribute and limits 1 and its RankFunction is * ROW_NUMBER, else false. @@ -335,7 +335,7 @@ object RankUtil { * @param rank * The [[FlinkLogicalRank]] node * @return - * True if the input rank could be converted to [[StreamPhysicalDeduplicate]] + * True if the input rank could be converted to [[StreamPhysicalWindowDeduplicate]] */ def canConvertToDeduplicate(rank: FlinkLogicalRank): Boolean = { val sortCollation = rank.orderKey