-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14664][SQL] Implement DecimalAggregates optimization for Window queries #12421
Conversation
@dongjoon-hyun good catch! Can't we just make the optimizer respect the window expression? I.e. wrap the entire window expression in the |
Oh, thank you for quick review. Actually, I tried to do first like that. There occurs exceptions about type mismatch due to the difference from input schema. So, it seems not so straightforward to me. However, I'll try again according to your comments. Thank you, @hvanhovell ! |
Test build #55940 has finished for PR 12421 at commit
|
Hi, @hvanhovell . According to your comments, I've worked this for a week, but I'm afraid of being a bottleneck. |
Test build #56602 has finished for PR 12421 at commit
|
Hi, @hvanhovell . |
Rebased. |
Test build #56840 has finished for PR 12421 at commit
|
@@ -1321,7 +1321,9 @@ object DecimalAggregates extends Rule[LogicalPlan] { | |||
/** Maximum number of decimal digits representable precisely in a Double */ | |||
private val MAX_DOUBLE_DIGITS = 15 | |||
|
|||
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | |||
def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressionsDown { | |||
case we @ WindowExpression(AggregateExpression(_, _, _, _), _) => we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the following work for sum?
case we @ WindowExpression(ae @ AggregateExpression(Sum(e @ DecimalType.Expression(prec, scale)), _, _, _), _) if prec + 10 <= MAX_LONG_DIGITS =>
MakeDecimal(we.copy(windowFunction = ae.copy(aggregateFunction = Sum(UnscaledValue(e))), prec + 10, scale))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thank you for coming back, @hvanhovell . I tried that last week that, but it faced the following error in WindowExec.scala. (last week, it was Window.scala)
Unsupported window function: cast(((avg(UnscaledValue(a#14)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
java.lang.RuntimeException: Unsupported window function: cast(((avg(UnscaledValue(a#14)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.WindowExec$$anonfun$windowFrameExpressionFactoryPairs$1$$anonfun$apply$2.apply(WindowExec.scala:183)
After fixing that, there occured errors at compiling generated code (codeGen result) due to the mismatched type between input schema (Decimal to Long and vice versa). I did try to change the input schema or BoundedReferences, too.
Actually, this observation is the same one I mentioned 9 days ago. I tried other approaches in other ways. But nothing was a clean solution for this. So, I decided to ask your help.
Oh, my bad. Today, from your code, I tried again to implement DecimalAggregate optimizer for Windows and handled to implement it. I think I made some mistake during last week. Thank you so much for your review and guiding, @hvanhovell ! After this PR passes the final Jenkins test, I will update the content of PR and JIRA, too. |
Test build #56881 has finished for PR 12421 at commit
|
The failures are
|
Test build #56885 has finished for PR 12421 at commit
|
Hi, @hvanhovell . |
Test build #56935 has finished for PR 12421 at commit
|
More test cases are added. |
Rebased. |
Test build #56968 has finished for PR 12421 at commit
|
Hi, @hvanhovell . |
Hi, @rxin . |
@dongjoon-hyun I am gonna get to this today. |
|
||
test("SPARK-14664: Decimal sum/avg over window should work.") { | ||
checkAnswer( | ||
sqlContext.sql("select sum(a) over () from (select explode(array(1.0,2.0,3.0)) a) t"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: you can also use select sum(a) over () from values 1.0, 2.0, 3.0 x(a)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I will use this consice form.
@dongjoon-hyun The PR is in pretty good shape. I left a few small comments/questions. |
@hvanhovell . |
Test build #57147 has finished for PR 12421 at commit
|
Test build #57149 has finished for PR 12421 at commit
|
LGTM |
Merging to master. Thanks! |
Thank you so much, @hvanhovell ! |
What changes were proposed in this pull request?
This PR aims to implement decimal aggregation optimization for window queries by improving existing
DecimalAggregates
. Historically,DecimalAggregates
optimizer is designed to transform generalsum/avg(decimal)
, but it breaks recently added windows queries like the followings. The following queries work well without the currentDecimalAggregates
optimizer.Sum
Average
After this PR, those queries work fine and new optimized physical plans look like the followings.
Sum
Average
In this PR, SUM over window pattern matching is based on the code of @hvanhovell ; he should be credited for the work he did.
How was this patch tested?
Pass the Jenkins tests (with newly added testcases)