Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full support for SUM overflow detection on decimal [databricks] #4272

Merged
merged 6 commits into from
Dec 6, 2021

Conversation

revans2
Copy link
Collaborator

@revans2 revans2 commented Dec 2, 2021

This fixes #3944

I ran some performance tests and I can see no real measurable difference when doing doing the extra overflow checking on the merge stage vs not on the merge stage. As such I have turned it on so we can have true overflow detection on decimal SUMs no matter what. This also made the docs a lot simpler to write.

I would like to do some follow on work and split apart AggregateFunctions.scala. It is way too large, but I didn't think that doing it here would be ideal.

@revans2 revans2 added the feature request New feature or request label Dec 2, 2021
@revans2 revans2 added this to the Nov 30 - Dec 10 milestone Dec 2, 2021
@revans2 revans2 self-assigned this Dec 2, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Dec 2, 2021

build

Copy link
Contributor

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits but otherwise looks OK. Would be good to have @abellina double-check the aggregation changes.

@revans2
Copy link
Collaborator Author

revans2 commented Dec 2, 2021

build

jlowe
jlowe previously approved these changes Dec 2, 2021
Copy link
Contributor

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me, but would be good to hear from @abellina on the agg changes before merging. At first I was shocked to not find the evaluateColumnar that was in the now-deleted processIncomingBatch, but I assume this is covered by AggHelper.preProcess.

@revans2
Copy link
Collaborator Author

revans2 commented Dec 2, 2021

Looks OK to me, but would be good to hear from @abellina on the agg changes before merging. At first I was shocked to not find the evaluateColumnar that was in the now-deleted processIncomingBatch, but I assume this is covered by AggHelper.preProcess.

Yes. There was a bug were the output of processIncommingBatch was being bound to aggBufferAttributes in AggHelper.preProcess. This made it so I could not have an output of inputProjection that didn't have the same number of values as aggBufferAttributes. I fixed it by moving the processing of inputProjection into AggHelper.preProcess, and then binding it to the input schema for update, or binding to aggBufferAttributes only if it is a merge or is being force to be a merge.

@abellina
Copy link
Collaborator

abellina commented Dec 2, 2021

Yeap, I'll take a crack at this review and have some comments by tomorrow.

abellina
abellina previously approved these changes Dec 3, 2021
Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried a few things with this patch like forcing the fallback (i.e. always merge) and decimals with spark 3.2.0. I don't see issues with it. I had a small nit, and it's not a blocker.

I spent time in the boundInputReferences change. I don't think the new code removes cases we covered before, and now adds the ability to have the differently shaped aggregation buffers that the overflow logic requires. The cast going away in processingIncomingBatch threw me off, but it should not be needed if our types are correct at bind time.

@revans2 revans2 dismissed stale reviews from abellina and jlowe via a0ef068 December 3, 2021 20:27
@revans2
Copy link
Collaborator Author

revans2 commented Dec 3, 2021

build

@revans2
Copy link
Collaborator Author

revans2 commented Dec 3, 2021

@abellina and @jlowe please take another look

jlowe
jlowe previously approved these changes Dec 3, 2021
abellina
abellina previously approved these changes Dec 3, 2021
@sameerz
Copy link
Collaborator

sameerz commented Dec 4, 2021

build

1 similar comment
@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

build

@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

It looks like one of my new tests is crashing databricks.

[2021-12-04T15:19:07.772Z] [gw3] [ 69%] PASSED ../../src/main/python/hash_aggregate_test.py::test_hash_reduction_decimal_overflow_sum[31] 
[2021-12-04T15:19:13.634Z] [gw3] [ 69%] FAILED ../../src/main/python/hash_aggregate_test.py::test_hash_reduction_decimal_overflow_sum[30] 
[2021-12-04T15:31:55.481Z] E                   py4j.protocol.Py4JJavaError: An error occurred while calling o258684.collectToPython.
[2021-12-04T15:31:55.481Z] E                   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 7638.0 failed 1 times, most recent failure: Lost task 4.0 in stage 7638.0 (TID 41051) (10.2.128.5 executor driver): java.lang.OutOfMemoryError: Could not allocate native memory: std::bad_alloc: out_of_memory: RMM failure at:/home/jenkins/agent/workspace/jenkins-cudf_nightly-dev-github-554-cuda11/cpp/build/_deps/rmm-src/include/rmm/mr/device/arena_memory_resource.hpp:157: Maximum pool size exceeded
[2021-12-04T15:31:55.481Z] E                    at ai.rapids.cudf.ColumnView.binaryOpVS(Native Method)
[2021-12-04T15:31:55.481Z] E                    at ai.rapids.cudf.ColumnView.binaryOp(ColumnView.java:1205)
[2021-12-04T15:31:55.481Z] E                    at ai.rapids.cudf.ColumnView.binaryOp(ColumnView.java:1195)
[2021-12-04T15:31:55.481Z] E                    at ai.rapids.cudf.BinaryOperable.div(BinaryOperable.java:171)
[2021-12-04T15:31:55.481Z] E                    at org.apache.spark.sql.rapids.GpuDecimalSumHighDigits.$anonfun$columnarEval$10(AggregateFunctions.scala:720)
[2021-12-04T15:31:55.481Z] E                    at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
[2021-12-04T15:31:55.481Z] E                    at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)

@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

So I did a little back of the envelope math on this. It gets a little complicated because of how some recent changes. We are doing a reduction on 10^8 values. If they are all 128-bit decimal values, then we end up with about 763 MB for all of the data in a single batch. But this should be split up among all of the tasks, which should be the number of CPU cores 8 in this case I think. That means a reduction should need about 95-96 MiB of memory to hold the input, and much less for the output. In the worst case we need to make a copy of the input so we can get the higher order digits out for extra checking. Where we failed it was trying to calculate this value, and was in the middle of a divide. That means we should have needed at most 200 MiB of memory to hold a single task's data.

This is running on a T4, but it is split up 4 ways because we are running the tests 4 at a time. The 16GiB is split into 5 parts, but it ends up being 1/5th of what is free and we take off 256 MiB of reserved space each time. That results in about 1321 MiB of total memory for the worst case task. It should be more than enough.

That is unless databricks is some how optimizing things in this case and we end up doing a lot more data in a single task If we do all of the data in a single task, then we end up with 1600 MiB needed, which is more then what we have available in most of the tasks. I'll see if I can reproduce this on my own.

@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

Okay I have a bit more information.

I did my math wrong above, and it was in one of the simple places, not the complex ones :(. I used a long instead of a 128-bit value. This means that we actually need about 1.5 GiB to hold all of the data in a single batch. But it is being divided up between 2 tasks for some reason. Not sure why, but thank you Databricks. Also the input columns have been factored out of the processing, so it is outputting batches that assume that the memory used would be a long.

If you combine all of this together, along with the other columns that we calculate as a part of SUM, we end up with an input batch with no columns and 50,000,000 rows. This is turned into a column of 763 MiB for the SUM itself. Also a boolean column of 48 MiB to keep track of what is and isn't empty. When we try to calculate the higher order bytes, it takes 763 MiB to produce the input column, and then when it tries to do the divide to produce the output column it fails to allocate enough memory. So it looks like I need to do a better job of properly sizing the input batches. I am not thrilled with this solution, but it is a rare problem, because someone is just doing a really dumb job of multiplying numbers, which should not really happen in real life.

@revans2 revans2 dismissed stale reviews from abellina and jlowe via bf020c7 December 6, 2021 16:06
@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

build

@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

@jlowe @abellina please take a look at my "fix" for the problem. If you want me to file a follow on issue I am happy to, but it comes down to our general problem with predicting memory usage and trying to stay under a max.

@revans2
Copy link
Collaborator Author

revans2 commented Dec 6, 2021

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Full support for sum with overflow on Decimal 128
4 participants