-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34702][table-planner] Refactor Deduplicate optimization to defer to StreamPhysicalRank for valid StreamExecDeduplicate node conversion to avoid exceptions #25380
Conversation
* Stream physical RelNode which deduplicate on keys and keeps only first row or last row. This node | ||
* is an optimization of [[StreamPhysicalRank]] for some special cases. Compared to | ||
* [[StreamPhysicalRank]], this node could use mini-batch and access less state. | ||
* TODO to be removed after FLINK-34702 is fixed. Stream physical RelNode which deduplicate on keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this operator may be used in existing Flink jobs, I do not believe we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamPhysicalDeduplicate
is just an intermediate node in the optimization process, after all the optimizations are done it will be converted to StreamExecDeduplicate
and eventually to the actual runtime sql duplicate operator.
Also, the compiled plan does not depend on the physical node, but on the exec node. Therefore, it does not affect existing sql jobs.
55d423f
to
5b795a3
Compare
.../flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
Outdated
Show resolved
Hide resolved
.../org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
Outdated
Show resolved
Hide resolved
@@ -321,7 +321,7 @@ Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS procti | |||
: +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], changelogMode=[I]) | |||
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]], fields=[amount, currency, rowtime], changelogMode=[I]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the physical plan looks good to me and the exec plan is not changed +1
LGTM, only left one minor comment |
…d instead decide to perform exec node transformations in StreamPhysicalRank
…inkChangelogModeInferenceProgram and StreamNonDeterministicUpdatePlanVisitor
…inkRelMdColumnUniqueness
…inkRelMdUniqueKeys
…inkRelMdModifiedMonotonicity
…inkRelMdUpsertKeys
75391b0
to
96f0dba
Compare
Thank you for reviewing this! I've rebased the latest master and will merge it after ci green. |
What is the purpose of the change
A refactor to avoid using StreamPhysicalDeduplicate and instead decide to perform exec node transformations in StreamPhysicalRank, so that we can completely avoid Exception "StreamPhysicalDeduplicate doesn't support consuming update changes..." for various queries.
Brief change log
Delay the rank/dedup transformation to physical -> exec phase
Verifying this change
Streaming Rank/DedupTest and newly added test cases
Does this pull request potentially affect one of the following parts:
Documentation