-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Support GpuCollectList/GpuCollectSet in groupBy aggregation #2804
Conversation
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
@@ -371,6 +374,107 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): | |||
.agg(f.sum('c')), | |||
conf=conf) | |||
|
|||
_repeat_agg_column_for_collect_op = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why "hash_aggregate"? In cudf, collect list/set are only sort_aggregages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ttnghia This test is not named after how cudf implements something. What is more cudf
hides that detail from users so any test outside of cudf itself That is a hidden implementation detail. It is named after how Spark implemented it, and we are trying to get coverage on that operator.
That said CollectList and CollectSet apparently are showing up as ObjectHashAggregateExec
and there are also some SortAggregateExec
tests in here too. At some point we should either split this up into separate files or rename it.
throw new UnsupportedOperationException("CollectSet is not yet supported in reduction") | ||
override lazy val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = | ||
throw new UnsupportedOperationException("CollectSet is not yet supported in reduction") | ||
override lazy val updateAggregate: Aggregation = Aggregation.collectSet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Humnn, is there any way to call Aggregation.collectList
if the data is partitioned into more than one batch? As we only need lists (which may contain duplicates) for the intermediate results. Calling collectSet to generate the intermediate results is expensive, as that involves unnecessarily executing drop_list_duplicates
on the temporary lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of change and I think we need to look into what is the proper way to deal with ObjectHashAggregate
. It feels kind of hacked together and I want to get a real design put in place before we push this in.
@@ -371,6 +374,107 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): | |||
.agg(f.sum('c')), | |||
conf=conf) | |||
|
|||
_repeat_agg_column_for_collect_op = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ttnghia This test is not named after how cudf implements something. What is more cudf
hides that detail from users so any test outside of cudf itself That is a hidden implementation detail. It is named after how Spark implemented it, and we are trying to get coverage on that operator.
That said CollectList and CollectSet apparently are showing up as ObjectHashAggregateExec
and there are also some SortAggregateExec
tests in here too. At some point we should either split this up into separate files or rename it.
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.STRUCT).nested(), | ||
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.BINARY + | ||
TypeSig.STRUCT).nested() | ||
.withPsNote(TypeEnum.BINARY, "Marking BINARY as plugin-supported is only to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly does this mean and how does this help an end user?
.withPsNote(TypeEnum.STRUCT, "Round-robin partitioning is not supported for nested " + | ||
s"structs if ${SQLConf.SORT_BEFORE_REPARTITION.key} is true") | ||
.withPsNote(TypeEnum.ARRAY, "Round-robin partitioning is not supported if " + | ||
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true") | ||
.withPsNote(TypeEnum.MAP, "Round-robin partitioning is not supported if " + | ||
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true"), | ||
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true") | ||
.withPsNote(TypeEnum.BINARY, "Marking BINARY as plugin-supported is only to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a restriction on this?
@@ -3004,6 +3034,19 @@ object GpuOverrides { | |||
.withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions"), | |||
TypeSig.all), | |||
(agg, conf, p, r) => new GpuHashAggregateMeta(agg, conf, p, r)), | |||
exec[ObjectHashAggregateExec]( | |||
"The backend for hash based aggregations", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a clearer description to distinguish it from the regular HashAggregateExec.
|
||
def filterNonAggBufBinaryExpressions(expressions: Seq[Expression]): Seq[Expression] = { | ||
expressions.filter { | ||
case AttributeReference("buf", BinaryType, _, _) => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What makes "buf" special? This is not good enough.
mutableAggBufferOffset: Int = 0, | ||
inputAggBufferOffset: Int = 0) | ||
extends GpuCollectBase[CollectSetAggregation] { | ||
|
||
override lazy val updateExpressions: Seq[GpuExpression] = new CudfCollectSet(inputBuf) :: Nil | ||
|
||
override lazy val mergeExpressions: Seq[GpuExpression] = new CudfMergeSets(outputBuf) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of totally removing CudfCollectSet
and CudfMergeSet
. We just call CudfCollectList
in updateExpressions
, and CudfMergeLists
in mergeExpressions
. During evaluateExpression
, we call CudfDropListDuplicates
on the merged lists.
Is this superceded by #2971? |
Yes, I closed it. |
Current PR is to support
GpuCollectList
andGpuCollectSet
in groupBy aggregation.CollectList
andCollectSet
are the first twoTypedImperativeAggregate
, which we attempt to provide GPU support. UnlikeDeclativeAggregate
and otherImperativeAggregate
(such asPivotFirst
),TypedImperativeAggregate
will create aggBufferAttributes of BinaryType (for serialization). Meanwhile, in GPU counterparts, we store aggBufferAttributes just as datatype of update expressions (Array[elementType] for collectOps). Therefore, we involve three additional tasks in this PR:GpuObjectHashAggregateMeta.convertToGpu
. AlthoughObjectHashAggregateExec
doesn't cover all cases ofTypedImperativeAggregate
, it covers the most common usages and we can support another situations later (for instance, aggregation contains multiple distinct, which rewritten by optimizer).TypeSig.BINARY
as plugin-supported forObjectHashAggregateExec
,HashPartition
andShuffleExchangeExec
. And checks the validity of BinaryTypes in the tagForGpu methods.GpuHashAggregateExec.setupReferences
to make it work with TypedImperativeAggregate in PartialMerge mode.I labeled this PR as WIP, since I am not quite confident whether the above approaches are appropriate or not.