Skip to content

Commit

Permalink
[FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from Fl…
Browse files Browse the repository at this point in the history
…inkRelMdUpsertKeys
  • Loading branch information
lincoln-lil committed Sep 25, 2024
1 parent 59721cc commit afa1f99
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGr
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil}

import com.google.common.collect.ImmutableSet
import org.apache.calcite.plan.hep.HepRelVertex
Expand Down Expand Up @@ -91,23 +91,24 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
}

def getUpsertKeys(rel: Rank, mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
val inputKeys = filterKeys(
FlinkRelMetadataQuery
.reuseOrCreate(mq)
.getUpsertKeys(rel.getInput),
rel.partitionKey)
FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys)
rel match {
case rank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
ImmutableSet.of(ImmutableBitSet.of(rank.partitionKey.toArray.map(Integer.valueOf).toList))
case _ =>
val inputKeys = filterKeys(
FlinkRelMetadataQuery
.reuseOrCreate(mq)
.getUpsertKeys(rel.getInput),
rel.partitionKey)
FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys)
}
}

def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] =
filterKeys(
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput),
ImmutableBitSet.of(rel.getCollation.getKeys))

def getUpsertKeys(rel: StreamPhysicalDeduplicate, mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList))
}

def getUpsertKeys(
rel: StreamPhysicalChangelogNormalize,
mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ LogicalProject(c=[$0], b=[$1], d=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], select=[c, b, d])
Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], select=[c, b, d])
+- Exchange(distribution=[hash[c, b]])
+- GroupAggregate(groupBy=[c, b], select=[c, b, SUM_RETRACT(a) FILTER $f3 AS d])
+- Exchange(distribution=[hash[c, b]])
Expand Down

0 comments on commit afa1f99

Please sign in to comment.