Skip to content

Commit

Permalink
[FLINK-34702][table-planner] Remove physical node StreamPhysicalDedup…
Browse files Browse the repository at this point in the history
…licate
  • Loading branch information
lincoln-lil committed Sep 25, 2024
1 parent a134711 commit 2f8b2d8
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank;
import org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule;
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
Expand Down Expand Up @@ -57,8 +57,8 @@
* a table source or a view only if it contains the unique key and time attribute.
*
* <p>Flink supports extract the primary key and row time attribute from the view if the view comes
* from {@link StreamPhysicalRank} node which can convert to a {@link StreamPhysicalDeduplicate}
* node.
* from {@link StreamPhysicalRank} node which can convert to a {@link StreamExecDeduplicate} node
* finally.
*/
@Value.Enclosing
public class TemporalJoinRewriteWithUniqueKeyRule
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank}
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank
import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, RankUtil}

import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config

/**
* Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]]. NOTES: the rank can
* not be converted to [[StreamPhysicalDeduplicate]].
*/
/** Rule that converts [[FlinkLogicalRank]] with fetch to [[StreamPhysicalRank]]. */
class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) {

override def convert(rel: RelNode): RelNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.ExpressionReducer
import org.apache.flink.table.planner.plan.nodes.calcite.Rank
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank}
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalRank, StreamPhysicalWindowDeduplicate}
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange}

import org.apache.calcite.plan.RelOptUtil
Expand Down Expand Up @@ -327,15 +327,15 @@ object RankUtil {
}

/**
* Whether the given rank could be converted to [[StreamPhysicalDeduplicate]].
* Whether the given rank could be converted to [[StreamPhysicalWindowDeduplicate]].
*
* Returns true if the given rank is sorted by time attribute and limits 1 and its RankFunction is
* ROW_NUMBER, else false.
*
* @param rank
* The [[FlinkLogicalRank]] node
* @return
* True if the input rank could be converted to [[StreamPhysicalDeduplicate]]
* True if the input rank could be converted to [[StreamPhysicalWindowDeduplicate]]
*/
def canConvertToDeduplicate(rank: FlinkLogicalRank): Boolean = {
val sortCollation = rank.orderKey
Expand Down

0 comments on commit 2f8b2d8

Please sign in to comment.