From afa1f99f829a9ee8bf8654789378acd14d5c993c Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Tue, 24 Sep 2024 23:36:04 +0800 Subject: [PATCH] [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from FlinkRelMdUpsertKeys --- .../plan/metadata/FlinkRelMdUpsertKeys.scala | 23 ++++++++++--------- .../planner/plan/stream/sql/RankTest.xml | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala index 358862826a2d4..df151c8a607f7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGr import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin import org.apache.flink.table.planner.plan.nodes.physical.stream._ import org.apache.flink.table.planner.plan.schema.IntermediateRelTable -import org.apache.flink.table.planner.plan.utils.FlinkRexUtil +import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil} import com.google.common.collect.ImmutableSet import org.apache.calcite.plan.hep.HepRelVertex @@ -91,12 +91,17 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Rank, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { - val inputKeys = filterKeys( - FlinkRelMetadataQuery - .reuseOrCreate(mq) - .getUpsertKeys(rel.getInput), - rel.partitionKey) - FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys) + rel match { + case rank: StreamPhysicalRank if RankUtil.isDeduplication(rel) => + ImmutableSet.of(ImmutableBitSet.of(rank.partitionKey.toArray.map(Integer.valueOf).toList)) + case _ => + val inputKeys = filterKeys( + FlinkRelMetadataQuery + .reuseOrCreate(mq) + .getUpsertKeys(rel.getInput), + rel.partitionKey) + FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys) + } } def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] = @@ -104,10 +109,6 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput), ImmutableBitSet.of(rel.getCollation.getKeys)) - def getUpsertKeys(rel: StreamPhysicalDeduplicate, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { - ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList)) - } - def getUpsertKeys( rel: StreamPhysicalChangelogNormalize, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml index 6ea1d56dc7391..f6abe90d2d0f4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml @@ -1176,7 +1176,7 @@ LogicalProject(c=[$0], b=[$1], d=[$2])