-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support GpuCollectList and GpuCollectSet as TypedImperativeAggregate #2971
Conversation
Signed-off-by: sperlingxx <[email protected]>
build |
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
build |
#2916 was postponed to the 21.10 release. Given we're in burndown, I think this should be retargeted to branch-21.10 when that is available (hopefully soon). |
pre-merge for 21.10 is ready |
Signed-off-by: sperlingxx <[email protected]>
build |
Re-targeted to the new branch. |
Signed-off-by: sperlingxx <[email protected]>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sperlingxx first pass through your changes.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
* Base class for metadata around `SortAggregateExec` and `ObjectHashAggregateExec`, which may | ||
* contain TypedImperativeAggregate functions in aggregate expressions. | ||
*/ | ||
abstract class GpuNoHashAggregateMeta[INPUT <: SparkPlan]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is called GpuNoHashAggregateMeta
, but ObjectHashAggregateExec
inherits from it. I think we should come up with a different name for GpuNoHashAggregateMeta
but I can understand why you chose this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because GpuSortAggregateMeta
also inherits from it. When spark.sql.execution.useObjectHashAggregateExec
is set to False, Spark catalyst will plan a SortAggregateExec
instead of ObjectAggregateExec
for Aggregate (logical plan) with TypedImperativeAggregate functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think we can come up with a better name, even a long name like. GpuTypeImperativeSupportedAggregateExecMeta
val column = result.getColumn(i) | ||
val rapidsType = GpuColumnVector.getRapidsType(dataTypes(i)) | ||
// extra type conversion check for nested types | ||
if ((rapidsType.equals(DType.LIST) || rapidsType.equals(DType.STRUCT)) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not call typeConversionAllowed
for all columns? (i.e. not need to special case LIST and STRUCT)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-nested types, it may happen type casting here, such as casting INT to LONG. Therefore, the check of typeConversionAllowed
may fail when the type conversion is necessary. For nested types, no type conversion is necessary (available), which indicates the check is safe. What's more, we didn't match any children types in GpuColumnVector.getRapidsType
. So, we check whether they match or not via typeConversionAllowed
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to see us handle things in terms of DataType instead of DType. getRapidsType
is something we removed when we started to work on nested types because it loses a lot of information and it can easily be misused.
As a side note, are we seeing issues with this? Are we collecting a list/struct and the types are not correct?
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Outdated
Show resolved
Hide resolved
….scala Co-authored-by: Alessandro Bellina <[email protected]>
Co-authored-by: Alessandro Bellina <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
build |
Signed-off-by: sperlingxx <[email protected]>
build |
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
build |
Thanks @sperlingxx. I am sorry for the delay, I'll take a look again today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mostly looks good. My main problem with this is
In addition, Aggregate stacks with TypedImperativeAggregate functions may lead to unexpected crash if the stack falls back to CPU partially, because GPU data types are inconsistent with CPU counterparts. This problem will be fixed in the task 4 of #2916. To avoid this kind of unexpected crash in current, we bring up the "associated fallback" mechanism in this PR, which only affects Aggregate plans containing TypedImperativeAggregate functions.
We cannot have CollectList and CollectSet on by default if there are chances that we can crash.
@@ -486,16 +486,32 @@ private static DType toRapidsOrNull(DataType type) { | |||
} else { | |||
return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale()); | |||
} | |||
} else if (supportNestedType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? We removed nested types because a DType.LIST is missing lot of information, and if we are not careful with this type of an API it can cause bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. So, I reverted this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"As a side note, are we seeing issues with this? Are we collecting a list/struct and the types are not correct?"
For CollectList and CollectSet, I believe the types are always correct. But I am not sure whether there are some aggregations which we plan to support in future producing the inconsistent nested types.
val column = result.getColumn(i) | ||
val rapidsType = GpuColumnVector.getRapidsType(dataTypes(i)) | ||
// extra type conversion check for nested types | ||
if ((rapidsType.equals(DType.LIST) || rapidsType.equals(DType.STRUCT)) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to see us handle things in terms of DataType instead of DType. getRapidsType
is something we removed when we started to work on nested types because it loses a lot of information and it can easily be misused.
As a side note, are we seeing issues with this? Are we collecting a list/struct and the types are not correct?
* Base class for metadata around `SortAggregateExec` and `ObjectHashAggregateExec`, which may | ||
* contain TypedImperativeAggregate functions in aggregate expressions. | ||
*/ | ||
abstract class GpuNoHashAggregateMeta[INPUT <: SparkPlan]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think we can come up with a better name, even a long name like. GpuTypeImperativeSupportedAggregateExecMeta
# Queries with multiple distinct aggregations will fallback to CPU if they also contain | ||
# collect aggregations. Because Spark optimizer will insert expressions like `If` and `First` | ||
# when rewriting distinct aggregates, while `GpuIf` and `GpuFirst` doesn't support the datatype | ||
# of collect aggregations (ArrayType). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you dd in references to the issue to support these for GpuIF and GpuFirst? If they do not exist, then could you please file them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I filed the issue.
count(distinct b), | ||
count(distinct c) | ||
from tbl group by a""" | ||
assert_gpu_and_cpu_are_equal_sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically when we have a fallback test we want an assertion that verifies part of the code actually did fall back like assert_gpu_sql_fallback_collect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the check of fallback capture.
@pytest.mark.parametrize('conf', [_nans_float_conf_partial, _nans_float_conf_final], ids=idfn) | ||
@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn) | ||
def test_hash_groupby_collect_partial_replace_fallback(data_gen, conf, aqe_enabled): | ||
conf.update({'spark.sql.adaptive.enabled': aqe_enabled}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please always copy conf before doing an update. We have seen issues with global values being modified by tests doing this and it is just good practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Yes, and I believe we can get rid of the potential crashes with "associated fallback". |
Signed-off-by: sperlingxx <[email protected]>
build |
.groupby('a') | ||
.agg(f.sort_array(f.collect_list('b')), f.sort_array(f.collect_set('b'))), | ||
conf=local_conf) | ||
assert_gpu_fallback_collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also verifies that the CPU and the GPU are equal so you don't need both parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refined.
cpu_fallback_class_name='ObjectHashAggregateExec', | ||
conf=local_conf) | ||
# test with single Distinct | ||
assert_gpu_and_cpu_are_equal_collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too if the tests are slightly different, then lets have a different test function for each test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refined.
val stageMetas = mutable.ListBuffer[GpuBaseAggregateMeta[_]]() | ||
// Go through all Aggregate stages to check whether all stages is GPU supported. If not, | ||
// we fall back all GPU supported stages to CPU. | ||
if (recursiveCheckForFallback(meta, logicalPlan, stageMetas)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just to be sure that I understand this correctly. When AQE is not enabled we go through and see if we can fall back or not and if any one of them fell back then we mark all of them as needing to fall back. Is that correct? What about when AQE is enabled and the first aggregation (the partial one) may have already executed? We can mark it to fall back to the CPU, but it will do nothing because it has already executed. How do we handle that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To adapt AQE, I took advantage of gpuSupportedTag
which was introduced by @andygrove. I added the line
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))
in GpuTypedImperativeSupportedAggregateExecMeta.tagPlanForGpu
to retrieve the information about the GPU support which was captured and cached during GpuQueryStagePrepOverrides
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand, but does that run over the entire plan at some point, or is it just sections of the plan. If it is the entire plan it would be good to explain that in a comment, because otherwise it looks like we have cases where we can crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made up some comments on this section.
meta: GpuTypedImperativeSupportedAggregateExecMeta[_]): Unit = { | ||
// We only run the check for final stages which contain TypedImperativeAggregate. | ||
val needToCheck = meta.agg.aggregateExpressions.exists(e => | ||
(e.mode == Final || e.mode == Complete) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI Complete means that the entire aggregation is happening in one pass. So there should be no need to check for a corresponding first part of the aggregation, because there should be none. This only shows up on databricks right now, so it is not super simple to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed Complete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also test this on databricks if you have not done so already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it on Databricks (https://blossom.nvidia.com/sw-gpu-spark-jenkins/view/Testing/job/lc-db/19/execution/node/60/log/). Everything looks fine.
Signed-off-by: sperlingxx <[email protected]>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good. I mostly want to be sure that we have run the tests on databricks. And it would be nice to have some of the comments in the fallback code updated to explain how it works with AQE so it is simpler to follow.
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark: gen_df(spark, data_gen, length=100) | ||
.groupby('a') | ||
.agg(f.sort_array(f.collect_list('b')), f.count('b')), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems every collect_*
function is wrapped in a sort_array
. Is that on purpose? Could a comment be added somewhere on why? Especially because we have @ignore_order
so I was curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because @ignore_order
only ensures the order between rows, while in these cases we also need to take care of the orders of each Array produced by collect ops.
And I added this comment to the test file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I’d extend ignore_order to do the sorting after the collect for the array case. The reason being you would be able to test the aggregate in another way a user is likely to invoke.
I’m ok if you want to do that as a follow up also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be fine with an @ignore_array_order
or something like that. I'd rather not have @ignore_order
cover both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But do it as a follow on issue if we do it at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had at this point the comment on the tests, but otherwise it is looking good so far.
Signed-off-by: sperlingxx <[email protected]>
build |
Signed-off-by: sperlingxx [email protected]
Current PR is to support GpuCollectList and GpuCollectSet as TypedImperativeAggregate, which is the task 3 of #2916. In this PR, we also introduce TypedImperativeAggExprMeta and GpuNoHashAggregateMeta to provide a general support for TypedImperativeAggregate functions.
In addition, Aggregate stacks with TypedImperativeAggregate functions may lead to unexpected crash if the stack falls back to CPU partially, because GPU data types are inconsistent with CPU counterparts. This problem will be fixed in the task 4 of #2916. To avoid this kind of unexpected crash in current, we bring up the "associated fallback" mechanism in this PR, which only affects Aggregate plans containing TypedImperativeAggregate functions.
The "associated fallback" falls back all stages of an Aggregate (logical plan) to CPU once we need to fall back any stage of the plan. The "associated fallback" will be triggered on each final stage of Aggregate which contains TypedImperativeAggregate functions. It traverses the plan tree to collect all stages of current Aggregate (logical plan), and to determine whether to fallback them entirely or not. In addition, the "associated fallback" also works when AQE is on.