-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add window expression part 1 - logical and physical planning, structure, to/from proto, and explain, for empty over clause only #334
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a cool start @jimexist 👍
7ae445a
to
11e9541
Compare
I wonder which approach makes more sense:
|
@jimexist I think it would be perfectly fine if more "advanced" features like window frames etc. are not supported, just as not all window functions have to be supported. |
ad02da7
to
a300aae
Compare
@Dandandan @alamb PTAL |
I will try to review this later today but I may not get to it until tomorrow |
|
||
message WindowExprNode { | ||
oneof window_function { | ||
AggregateFunction aggr_function = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked whether this makes sense to reuse aggregate functions for window expressions - I think it does! E.g. PostgreSQL also says:
https://www.postgresql.org/docs/9.3/functions-window.html
In addition to these functions, any built-in or user-defined aggregate function can be used as a window function (see Section 9.20 for a list of the built-in aggregates). Aggregate functions act as window functions only when an OVER clause follows the call; otherwise they act as regular aggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes in general three types of things can be used:
- aggregation
- UDAF
- built in window function
for both 1. and 2. they are not order sensitive, but for 3 we'll have to take sort into account
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[postgres] # explain select c1, count(c3) over (partition by c1 order by c3) from test;
QUERY PLAN
------------------------------------------------------------------
WindowAgg (cost=6.32..8.32 rows=100 width=12)
-> Sort (cost=6.32..6.57 rows=100 width=4)
Sort Key: c1, c3
-> Seq Scan on test (cost=0.00..3.00 rows=100 width=4)
(4 rows)
[postgres] # explain select c1, first_value(c3) over (partition by c1 order by c3) from test;
QUERY PLAN
------------------------------------------------------------------
WindowAgg (cost=6.32..8.32 rows=100 width=6)
-> Sort (cost=6.32..6.57 rows=100 width=4)
Sort Key: c1, c3
-> Seq Scan on test (cost=0.00..3.00 rows=100 width=4)
(4 rows)
IMO only the second time we'll really need to sort by c3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also fun thing to notice:
[postgres] # explain analyze select c1, sum(c3) over (partition by c1 order by c3), avg(c3) over (partition by c1 order by c3 desc) from test;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
WindowAgg (cost=11.64..13.64 rows=100 width=44) (actual time=1.287..1.373 rows=100 loops=1)
-> Sort (cost=11.64..11.89 rows=100 width=36) (actual time=1.281..1.292 rows=100 loops=1)
Sort Key: c1, c3
Sort Method: quicksort Memory: 31kB
-> WindowAgg (cost=6.32..8.32 rows=100 width=36) (actual time=1.051..1.174 rows=100 loops=1)
-> Sort (cost=6.32..6.57 rows=100 width=4) (actual time=0.221..0.231 rows=100 loops=1)
Sort Key: c1, c3 DESC
Sort Method: quicksort Memory: 29kB
-> Seq Scan on test (cost=0.00..3.00 rows=100 width=4) (actual time=0.010..0.028 rows=100 loops=1)
Planning Time: 0.087 ms
Execution Time: 1.437 ms
(11 rows)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked whether this makes sense to reuse aggregate functions for window expressions - I think it does! E.g. PostgreSQL also says:
https://www.postgresql.org/docs/9.3/functions-window.htmlIn addition to these functions, any built-in or user-defined aggregate function can be used as a window function (see Section 9.20 for a list of the built-in aggregates). Aggregate functions act as window functions only when an OVER clause follows the call; otherwise they act as regular aggregates.
it is very useful for analytics, e.g. if you want to know (in an employee table with name, department, and salary) the list of employees in each department with salary above average.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe count(..) over order by ..
also needs to be sorted, it will do a count over the window, which means a running count (over sorted rows) by default.
But yeah very useful for analytics indeed 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment above was more about re-using the same functions over here - as I thought we might not want to support every aggregation function here too. But for me it sounds like a good idea to reuse them. Maybe @alamb has some ideas about it as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SQL is confusing in this area -- as @jimexist says, all "normal" aggregate functions (e.g. sum, count, etc) are also valid window functions, but the reverse is not true. You can't use window functions (e.g. LAG, LEAD, etc) outside of a window clause.
Thus I think representing window functions as a new type of function, as this PR does, makes the most sense. They are different enough (e.g. require information on the incoming windows) that trying to wrangle them into the same structures as normal aggregates seems like it will get messy. Long term I would expect we have a UDWF (user defined window function) api as well.
Ideally the physical implementation for sum
/ count
/ etc can be mostly reused but in the plans I think they are different enough to warrant different plan structures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe count(..) over order by .. also needs to be sorted, it will do a count over the window, which means a running count (over sorted rows) by default.
Good point! Indeed.
Checking it out... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went over this PR quite carefully. Thank you very much @jimexist for the contribution! ❤️
❤️ -- this PR looks to be in great shape.
All I think this PR needs is a few more tests and it could be merged in.
I am not familiar with the ballista code, but it looked ok to me. @andygrove do you have any suggestions of who might be interested in those changes?
|
||
message WindowExprNode { | ||
oneof window_function { | ||
AggregateFunction aggr_function = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SQL is confusing in this area -- as @jimexist says, all "normal" aggregate functions (e.g. sum, count, etc) are also valid window functions, but the reverse is not true. You can't use window functions (e.g. LAG, LEAD, etc) outside of a window clause.
Thus I think representing window functions as a new type of function, as this PR does, makes the most sense. They are different enough (e.g. require information on the incoming windows) that trying to wrangle them into the same structures as normal aggregates seems like it will get messy. Long term I would expect we have a UDWF (user defined window function) api as well.
Ideally the physical implementation for sum
/ count
/ etc can be mostly reused but in the plans I think they are different enough to warrant different plan structures.
))); | ||
} | ||
|
||
// window needs to operate on a single partition currently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Eventually it would be cool to push the partitioning expressions into a RepartitionExec
so that we can execute the window functions in parallel on different windows but that is definitely an optimization for the future (not this initial PR) :)
@@ -2641,13 +2701,23 @@ mod tests { | |||
} | |||
|
|||
#[test] | |||
fn over_not_supported() { | |||
fn empty_over() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Great
// .iter() | ||
// .map(|expr| expr.try_into()) | ||
// .collect::<Result<Vec<_>, _>>()?; | ||
// // FIXME parse the window_frame data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks this is still WIP? Is the plan to complete this as part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @jimexist plans to implement the feature in several PRs as outlined in the PR description
I haven't had the time to go through yet; Since this is a large decomposable change, a suggestion here is to create a branch and merge this on that branch, so that we do not have incomplete features in master and allow PRs to that branch without an issue. Then merge the branch into master once its first iteration is ready (e.g. logical and 1 physical plan). (This is something that I wished to have when implementing large features; it gives the time to review in parts without the risk of having incomplete code in master) With that said, I am also fine risk it and merg to master; @jimexist do you have any preference how would you prefer to work on this here? :) |
I am supportive of putting this directly on master -- we have various other "not yet implemented" functionality in DataFusion and I think we can handle any potential confusion with additional documentation |
I wish to:
3 because although I agree with having a feature branch, the maintenance of continuous rebasing would be troublesome, and I can estimate the whole series of window function implementations take > 1 month. if I can keep the merged code isolated and structurally complete then there's little worry about breaking changes in future: I believe this PR:
|
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing design and implementation. I left two small comments, but it is regardless ready to merge.
Thank you very much, @jimexist ; great work.
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame | ||
pub fn window( | ||
&self, | ||
window_expr: impl IntoIterator<Item = Expr>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we should use IntoIterator<Item = Expr>
for every field with 6 fields. This will produce a version of the compiled function for every combination, which IMO adds an unnecessary compile time and binary size.
IntoIterator
is more relevant when we want to avoid an extra allocation; these are very small vectors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracked in #372
@@ -183,7 +182,7 @@ static TIMESTAMPS: &[DataType] = &[ | |||
]; | |||
|
|||
/// the signatures supported by the function `fun`. | |||
fn signature(fun: &AggregateFunction) -> Signature { | |||
pub fn signature(fun: &AggregateFunction) -> Signature { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub(super)
or pub(crate)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracked in #373
Codecov Report
@@ Coverage Diff @@
## master #334 +/- ##
==========================================
- Coverage 75.89% 74.94% -0.96%
==========================================
Files 144 146 +2
Lines 23771 24314 +543
==========================================
+ Hits 18040 18221 +181
- Misses 5731 6093 +362
Continue to review full report at Codecov.
|
Which issue does this PR close?
We'd like to support window function in three or more steps:
OVER
clause #298 empty over clauseCloses #359.
This is partly re #298.
Work to be done next: finish #298 and #299
Rationale for this change
this is a very stretch version of adding window function constructs to the planner, proto, etc.
What changes are included in this PR?
Are there any user-facing changes?