-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for arrays in hashaggregate [databricks] #6066
Conversation
Signed-off-by: Raza Jafri <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
@razajafri when you get a chance can this be retargeted to 22.10 please? |
Signed-off-by: Raza Jafri <[email protected]>
Signed-off-by: Raza Jafri <[email protected]>
Signed-off-by: Raza Jafri <[email protected]>
@@ -335,7 +339,8 @@ def test_hash_reduction_decimal_overflow_sum(precision): | |||
# some optimizations are conspiring against us. | |||
conf = {'spark.rapids.sql.batchSizeBytes': '128m'}) | |||
|
|||
@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn) | |||
@allow_non_gpu("ShuffleExchangeExec") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is here. Do we not support shuffling a particular datatype yet support aggregations on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing. I have added Arrays to ShuffleExchangeExec.
Signed-off-by: Raza Jafri <[email protected]>
build |
Signed-off-by: Raza Jafri <[email protected]>
build |
def _grpkey_list_with_non_nested_children(): | ||
return [[('a', RepeatSeqGen(ArrayGen(data_gen), length=3)), | ||
('b', IntegerGen())] for data_gen in all_basic_gens + decimal_gens] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a function? It takes no parameters and isn't passed around as a function.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Raza Jafri <[email protected]>
build |
Signed-off-by: Raza Jafri <[email protected]>
build |
@@ -3730,7 +3728,11 @@ object GpuOverrides extends Logging { | |||
// This needs to match what murmur3 supports. | |||
PartChecks(RepeatingParamCheck("hash_key", | |||
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + | |||
TypeSig.STRUCT).nested(), TypeSig.all)), | |||
TypeSig.STRUCT).nested() + | |||
TypeSig.ARRAY.nested( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not an expert on TypeSig
, it just seems a bit weird that we are calling nested()
and the nested(child type)
for ARRAY
. Why do we need to call both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not an expert on TypeSig either and this was my misunderstanding. I thought the nested
children are kept in separate buckets under parents, but that's not true. We hold the initialTypes and childTypes as flat lists. I will fix this to avoid the confusion.
Signed-off-by: Raza Jafri <[email protected]>
def test_hash_grpby_sum_count_action(data_gen): | ||
assert_gpu_and_cpu_row_counts_equal( | ||
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.sum('b')) | ||
) | ||
|
||
@allow_non_gpu("ShuffleExchangeExec", "HashAggregateExec") | ||
@pytest.mark.parametrize('data_gen', [_grpkey_nested_structs_with_array_child], ids=idfn) | ||
def test_hash_grpby_sum_count_action_fallback(data_gen): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit comment says this is for testing shuffle exec fallback, but that's not really what this does. This is more about testing hash aggregate fallback (and arguably is a duplicate of the existing test_hash_agg_with_struct_of_array_fallback
). The shuffle is falling back because both sides are also falling back, and it is inefficient to shuffle on the GPU to/from CPU exec nodes.
To have a test more focused on shuffle, it should be in repart_test.py and use something like repartition
to force a shuffle. See other tests in repart_test.py for examples. Make sure the test passes with supported types and falls back for unsupported types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So Spark is deciding not to Shuffle this. I realized something was off about this right after pushing this. I am not seeing it hit the wrapPart
method which was confusing me, thank you for clarifying as to why it's not replacing the ShuffleExec when it should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for clarifying as to why it's not replacing the ShuffleExec when it should.
But that's just it -- it should not replace the shuffle even if it wanted to. Shuffling with arrays-of-struct as the partitioning key should not be supported by ShuffleExec just like we cannot support grouping with arrays-of-struct. I was wrong above, repartition
is not what we want here because that doesn't involve partitioning on any particular column. We need to test partitioning on array-of-struct keys which s not supported. Not sure we can easily test this at the integration test level that we're doing this properly, as I think we would need some way for Spark to hash partition on array-of-struct into GPU operations that support that but the shuffle does not. But if we can do an operation that needs hash partitioning of array-of-struct (like groupby) then we should be able to hash partition it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added an integration test. I modified a repart_test by adding Array of structs as a type and then just refactored that into a separate test. I know you mentioned that repartition won't work but I wanted to run this by you to see if this does
Signed-off-by: Raza Jafri <[email protected]>
This reverts commit f26c0ff. Signed-off-by: Raza Jafri <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Raza Jafri <[email protected]>
build |
1 similar comment
build |
CI is failing on DB because ShuffleExchangeExec isn't found in the plan. Could be because DB is optimizing the plan differently from vanilla Spark. I will see if I can write another test for DB and skip this test for it. |
Signed-off-by: Raza Jafri <[email protected]>
Signed-off-by: Raza Jafri <[email protected]>
build |
This reverts commit b6bd34b. Signed-off-by: Raza Jafri <[email protected]>
Signed-off-by: Raza Jafri <[email protected]>
build |
@jlowe can you do the honor one more time? |
…6066)" This reverts commit 122e107. Signed-off-by: Raza Jafri <[email protected]>
…#6679) This reverts commit 122e107. Signed-off-by: Raza Jafri <[email protected]> Signed-off-by: Raza Jafri <[email protected]> Co-authored-by: Raza Jafri <[email protected]>
…6066)" (NVIDIA#6679) This reverts commit 122e107. Signed-off-by: Raza Jafri <[email protected]> Signed-off-by: Raza Jafri <[email protected]> Co-authored-by: Raza Jafri <[email protected]>
…VIDIA#6066)" (NVIDIA#6679)" This reverts commit c05ac2d. Signed-off-by: Raza Jafri <[email protected]>
* Revert "Revert "Add support for arrays in hashaggregate [databricks] (#6066)" (#6679)" This reverts commit c05ac2d and adds tests * Add test for aggregation on array * updated docs --------- Signed-off-by: Raza Jafri <[email protected]>
This PR enables HashAggregate for Arrays.
fixes #4656
Signed-off-by: Raza Jafri [email protected]