-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refine GpuHashAggregateExec.setupReference #2917
Changes from 3 commits
3dcce50
cf39647
0af7991
ca40f1a
d9e3ea1
1bb0e81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -627,83 +627,67 @@ class GpuHashAggregateIterator( | |||||
val aggModeCudfAggregates = | ||||||
AggregateUtils.computeAggModeCudfAggregates(aggregateExpressions, aggBufferAttributes) | ||||||
|
||||||
// boundInputReferences is used to pick out of the input batch the appropriate columns | ||||||
// for aggregation. | ||||||
// | ||||||
// expressions to pick input to the aggregate, and finalize the output to the result projection. | ||||||
// | ||||||
// Pick update distinct attributes or input projections for Partial | ||||||
val (distinctAggExpressions, nonDistinctAggExpressions) = aggregateExpressions.partition( | ||||||
_.isDistinct) | ||||||
val updateExpressionsDistinct = | ||||||
distinctAggExpressions.flatMap( | ||||||
_.aggregateFunction.updateExpressions) | ||||||
val updateAttributesDistinct = | ||||||
distinctAggExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) | ||||||
val inputProjectionsDistinct = | ||||||
distinctAggExpressions.flatMap(_.aggregateFunction.inputProjection) | ||||||
|
||||||
// Pick merge non-distinct for PartialMerge | ||||||
val mergeExpressionsNonDistinct = | ||||||
nonDistinctAggExpressions | ||||||
// - PartialMerge with Partial mode: we use the inputProjections or distinct update expressions | ||||||
sperlingxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
// for Partial and non distinct merge expressions for PartialMerge. | ||||||
// - Final or PartialMerge-only mode: we pick the columns in the order as handed to us. | ||||||
// - Partial or Complete mode: we use the inputProjections or distinct update expressions. | ||||||
sperlingxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
val boundInputReferences = | ||||||
if (modeInfo.hasPartialMerge && modeInfo.uniqueModes.contains(Partial)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bonus points => There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We used |
||||||
// The 3rd stage of AggWithOneDistinct, which combines (partial) reduce-side | ||||||
// nonDistinctAggExpressions and map-side distinctAggExpressions. For this stage, we need to | ||||||
// switch the position of distinctAttributes and nonDistinctAttributes. | ||||||
// | ||||||
// The schema of the 2nd stage's outputs: | ||||||
// groupingAttributes ++ distinctAttributes ++ nonDistinctAggBufferAttributes | ||||||
// | ||||||
// The schema of the 3rd stage's expressions: | ||||||
// nonDistinctMergeAggExpressions ++ distinctPartialAggExpressions | ||||||
|
||||||
val (distinctAggExpressions, nonDistinctAggExpressions) = aggregateExpressions.partition( | ||||||
_.isDistinct) | ||||||
|
||||||
// Pick merge non-distinct for PartialMerge | ||||||
val nonDistinctExpressions = nonDistinctAggExpressions | ||||||
.flatMap(_.aggregateFunction.mergeExpressions) | ||||||
.map(_.asInstanceOf[CudfAggregate].ref) | ||||||
val mergeAttributesNonDistinct = | ||||||
nonDistinctAggExpressions.flatMap( | ||||||
_.aggregateFunction.aggBufferAttributes) | ||||||
|
||||||
// Partial with no distinct or when modes are empty | ||||||
val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions | ||||||
.flatMap(_.aggregateFunction.inputProjection) | ||||||
|
||||||
var distinctAttributes = Seq[Attribute]() | ||||||
var distinctExpressions = Seq[Expression]() | ||||||
var nonDistinctAttributes = Seq[Attribute]() | ||||||
var nonDistinctExpressions = Seq[Expression]() | ||||||
modeInfo.uniqueModes.foreach { | ||||||
case PartialMerge => | ||||||
nonDistinctAttributes = mergeAttributesNonDistinct | ||||||
nonDistinctExpressions = mergeExpressionsNonDistinct | ||||||
case Partial => | ||||||
// Partial with distinct case | ||||||
val updateExpressionsCudfAggsDistinct = | ||||||
updateExpressionsDistinct.filter(_.isInstanceOf[CudfAggregate]) | ||||||
.map(_.asInstanceOf[CudfAggregate].ref) | ||||||
if (inputProjectionsDistinct.exists(p => !p.isInstanceOf[NamedExpression])) { | ||||||
// Case of distinct average we need to evaluate the "GpuCast and GpuIsNotNull" columns. | ||||||
// Refer to how input projections are setup for GpuAverage. | ||||||
// In the case where we have expressions to evaluate, pick the unique attributes | ||||||
// references from them as you only have one column for it before you start evaluating. | ||||||
distinctExpressions = inputProjectionsDistinct | ||||||
distinctAttributes = inputProjectionsDistinct.flatMap(ref => | ||||||
ref.references.toSeq).distinct | ||||||
} else { | ||||||
distinctAttributes = updateAttributesDistinct | ||||||
distinctExpressions = updateExpressionsCudfAggsDistinct | ||||||
} | ||||||
case _ => | ||||||
} | ||||||
val inputBindExpressions = groupingExpressions ++ nonDistinctExpressions ++ distinctExpressions | ||||||
val resultingBindAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes | ||||||
|
||||||
val finalProjections = groupingExpressions ++ | ||||||
aggregateExpressions.map(_.aggregateFunction.evaluateExpression) | ||||||
|
||||||
// boundInputReferences is used to pick out of the input batch the appropriate columns | ||||||
// for aggregation | ||||||
// - Partial Merge mode: we use the inputBindExpressions which can be only | ||||||
// non distinct merge expressions. | ||||||
// - Partial or Complete mode: we use the inputProjections or distinct update expressions. | ||||||
// - Partial, PartialMerge mode: we use the inputProjections or distinct update expressions | ||||||
// for Partial and non distinct merge expressions for PartialMerge. | ||||||
// - Final mode: we pick the columns in the order as handed to us. | ||||||
val boundInputReferences = if (modeInfo.hasPartialMerge) { | ||||||
GpuBindReferences.bindGpuReferences(inputBindExpressions, resultingBindAttributes) | ||||||
} else if (modeInfo.hasFinalMode) { | ||||||
val nonDistinctAttributes = nonDistinctAggExpressions | ||||||
.flatMap(_.aggregateFunction.aggBufferAttributes) | ||||||
|
||||||
// Pick update distinct attributes or input projections for Partial | ||||||
val distinctExpressions = distinctAggExpressions.flatMap(_.aggregateFunction.inputProjection) | ||||||
val distinctAttributes = distinctExpressions.flatMap(_.references.toSeq).distinct | ||||||
|
||||||
val inputProjections = groupingExpressions ++ nonDistinctExpressions ++ distinctExpressions | ||||||
val inputAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes | ||||||
GpuBindReferences.bindGpuReferences(inputProjections, inputAttributes) | ||||||
} else if (modeInfo.hasFinalMode || | ||||||
(modeInfo.hasPartialMerge && modeInfo.uniqueModes.length == 1)) { | ||||||
// two possible conditions: | ||||||
// 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of | ||||||
// AggWithOneDistinct, which needs no input projections. Because the child outputs are | ||||||
// internal aggregation buffers, which are aligned for the final stage. | ||||||
// | ||||||
// 2. The 2nd stage (PartialMerge) of AggWithOneDistinct, which works like the final stage | ||||||
// taking the child outputs as inputs without any projections. | ||||||
GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) | ||||||
} else { | ||||||
} else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode || | ||||||
modeInfo.uniqueModes.isEmpty) { | ||||||
// The first aggregation stage (including Partial or Complete or no aggExpression), | ||||||
// whose child node is not an AggregateExec. Therefore, input projections are essential. | ||||||
val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions | ||||||
.flatMap(_.aggregateFunction.inputProjection) | ||||||
GpuBindReferences.bindGpuReferences(inputProjections, childAttr) | ||||||
} else { | ||||||
// This branch should NOT be reached. | ||||||
throw new IllegalStateException(s"invalid unique modes: ${modeInfo.uniqueModes}") | ||||||
sperlingxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
val boundFinalProjections = if (modeInfo.hasFinalMode || modeInfo.hasCompleteMode) { | ||||||
val finalProjections = groupingExpressions ++ | ||||||
aggregateExpressions.map(_.aggregateFunction.evaluateExpression) | ||||||
Some(GpuBindReferences.bindGpuReferences(finalProjections, aggBufferAttributes)) | ||||||
} else { | ||||||
None | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments mention
distinct update expressions
, but I am not finding the use of update expressions in your change, only input projections.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this part along with comments, in order to verify and clarify "input projections is all that is needed in that case".