-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12173][table] Optimize SELECT DISTINCT #25752
base: master
Are you sure you want to change the base?
Conversation
...ink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
Outdated
Show resolved
Hide resolved
...rg/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
Outdated
Show resolved
Hide resolved
* e.g. {SELECT DISTINCT a, b, c;} will be converted to [[FlinkLogicalRank]] instead of | ||
* [[FlinkLogicalAggregate]] in rowtime. | ||
*/ | ||
class StreamLogicalOptimizeSelectDistinctRule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yiyutian1 and I worked on this together and we started with a Scala example.
Given that we are working on migrating the Scala rules to Java, could you take a look at migrating this rule to Java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment Jim!
Are we migrating from Scala->Java for converters now too, or are we mainly doing it for builtInFunctions?
My impression is that for buildInFunctions we want that migration because auto-generated Java code is hard to maintain, but that doesn't seem to be a problem here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@snuyanzin , could you provide some feedback here?
Could we try merging this ticket as is, so that the optimizer can be available soon, and then we can do the migration in a separate effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great to see green ci first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI is green! Woohoo!
Could we get some feedback? @snuyanzin
Many thanks.
...apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
Outdated
Show resolved
Hide resolved
...er/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
Outdated
Show resolved
Hide resolved
@flinkbot run azure |
@flinkbot run azure |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
Hi @xuyangzhong, @wuchong @lincoln-lil, could I get your feedback on this PR? |
@xuyangzhong I noticed you discussed some of this changes with @jnh5y in FLINK-35792 //cc @lincoln-lil (i think you might be interested in this as well) |
* e.g. {SELECT DISTINCT a, b, c;} will be converted to [[FlinkLogicalRank]] instead of | ||
* [[FlinkLogicalAggregate]] in rowtime. | ||
*/ | ||
class StreamLogicalOptimizeSelectDistinctRule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally it would be great to have it converted to java in this PR.
I looked into some previous PR and this is how it was: if a rule was new then it was requested to have it in java
if it is an old one then it's ok to convert it in a separate PR
|SELECT DISTINCT a, b, c | ||
|FROM MyTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any IT case for that?
What is the purpose of the change
This is to optimize SELECT DISTINCT query from using GroupAgg to DeDuplicate.
Brief change log
This PR implements a new Calcite rule that does the following:
So instead, we are optimizing by converting the current plan from FlinkLogicalRank instead of FlinkLogicalAggregate in rowtime, which has the similar effect as of the original plan.
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change is already covered by existing tests in the table/planner module. For example:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation