Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in basic support for OOM retry for project and filter #7864

Merged
merged 3 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,8 +70,12 @@ public static synchronized void debug(String name, Table table) {
* @param cb the batch to print out.
*/
public static synchronized void debug(String name, ColumnarBatch cb) {
try (Table table = from(cb)) {
debug(name, table);
if (cb.numCols() <= 0) {
System.err.println("DEBUG " + name + " NO COLS " + cb.numRows() + " ROWS");
} else {
try (Table table = from(cb)) {
debug(name, table);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@ import ai.rapids.cudf.ast
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SortOrder}
import org.apache.spark.sql.rapids.catalyst.expressions.GpuEquivalentExpressions
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -130,19 +130,41 @@ object GpuBindReferences extends Logging {

/**
* A helper function to bind given expressions to an input schema where the expressions are
* to be processed on the GPU, and the result type indicates this. Common sub-expressions
* bound with their inputs are placed into a sequence of tiers in a GpuTieredProject object.
* to be processed on the GPU, and the result type indicates this. If runTiered is true
* Common sub-expressions will be factored out where possible to reduce the runtime and memory.
* If set to false a GpuTieredProject object will still be returned, but no common
* sub-expressions will be factored out.
*/
def bindGpuReferencesTiered[A <: Expression](
expressions: Seq[A],
input: AttributeSeq): GpuTieredProject = {

val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions)
val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input)
GpuTieredProject(exprTiers.zip(inputTiers).map {
case (es:Seq[Expression], is:AttributeSeq) =>
es.map(GpuBindReferences.bindGpuReference(_, is)).toList
}, inputTiers)
input: AttributeSeq,
runTiered: Boolean): GpuTieredProject = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish I had thought to move pass runTiered like this. Definitely cleans up the call sites.

if (runTiered) {
val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions)
val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input)
// Update ExprTiers to include the columns that are pass through and drop unneeded columns
val newExprTiers = exprTiers.zipWithIndex.map {
case (exprTier, index) =>
// get what the output should look like.
val atInput = index + 1
if (atInput < inputTiers.length) {
inputTiers(atInput).attrs.map { attr =>
exprTier.find { expr =>
expr.asInstanceOf[NamedExpression].toAttribute == attr
}.getOrElse(attr)
}
} else {
exprTier
}
}
Comment on lines +147 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice refactoring of the code to remove the unneeded columns - simplifies the project code.

GpuTieredProject(newExprTiers.zip(inputTiers).map {
case (es: Seq[Expression], is: AttributeSeq) =>
es.map(GpuBindReferences.bindGpuReference(_, is)).toList
})
} else {
GpuTieredProject(Seq(GpuBindReferences.bindGpuReferences(expressions, input)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,25 @@ object SpillableColumnarBatch extends Arm {
}
}

private[this] def allFromSameBuffer(batch: ColumnarBatch): Boolean = {
var bufferAddr = 0L
var isSet = false
val numColumns = batch.numCols()
(0 until numColumns).forall { i =>
batch.column(i) match {
case fb: GpuColumnVectorFromBuffer =>
if (!isSet) {
bufferAddr = fb.getBuffer.getAddress
isSet = true
true
} else {
bufferAddr == fb.getBuffer.getAddress
}
case _ => false
}
}
}

private[this] def addBatch(
batch: ColumnarBatch,
initialSpillPriority: Long,
Expand All @@ -199,8 +218,7 @@ object SpillableColumnarBatch extends Arm {
initialSpillPriority,
spillCallback)
} else if (numColumns > 0 &&
(0 until numColumns)
.forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) {
allFromSameBuffer(batch)) {
val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer]
val buff = cv.getBuffer
RapidsBufferCatalog.addBuffer(buff, cv.getTableMeta, initialSpillPriority,
Expand Down
41 changes: 16 additions & 25 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -249,16 +249,12 @@ object GpuHashAggregateIterator extends Arm with Logging {
} else {
inputAttributes
}
private val (preStepBound, preStepBoundTiered) = if (useTieredProject) {
(None, Some(GpuBindReferences.bindGpuReferencesTiered(preStep.toList,
preStepAttributes.toList)))
} else {
(Some(GpuBindReferences.bindGpuReferences(preStep, preStepAttributes.toList)), None)
}
private val preStepBound = GpuBindReferences.bindGpuReferencesTiered(preStep.toList,
preStepAttributes.toList, useTieredProject)

// a bound expression that is applied after the cuDF aggregate
private val postStepBound =
GpuBindReferences.bindGpuReferences(postStep, postStepAttr)
private val postStepBound = GpuBindReferences.bindGpuReferencesTiered(postStep.toList,
postStepAttr.toList, useTieredProject)

/**
* Apply the "pre" step: preMerge for merge, or pass-through in the update case
Expand All @@ -269,14 +265,11 @@ object GpuHashAggregateIterator extends Arm with Logging {
def preProcess(
toAggregateBatch: ColumnarBatch,
metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = {
val projectedCb = withResource(toAggregateBatch) { inputBatch =>
withResource(new NvtxRange("pre-process", NvtxColor.DARK_GREEN)) { _ =>
if (useTieredProject) {
preStepBoundTiered.get.tieredProject(inputBatch)
} else {
GpuProjectExec.project(inputBatch, preStepBound.get)
}
}
val inputBatch = SpillableColumnarBatch(toAggregateBatch,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY, metrics.spillCallback)

val projectedCb = withResource(new NvtxRange("pre-process", NvtxColor.DARK_GREEN)) { _ =>
preStepBound.projectAndCloseWithRetrySingleBatch(inputBatch, metrics.spillCallback)
}
SpillableColumnarBatch(
projectedCb,
Expand Down Expand Up @@ -382,14 +375,12 @@ object GpuHashAggregateIterator extends Arm with Logging {
* @return output batch from the aggregate
*/
def postProcess(
aggregatedSpillable: SpillableColumnarBatch): SpillableColumnarBatch = {
aggregatedSpillable: SpillableColumnarBatch,
metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = {
val postProcessed =
withResource(aggregatedSpillable) { _ =>
withResource(aggregatedSpillable.getColumnarBatch()) { aggregated =>
withResource(new NvtxRange("post-process", NvtxColor.ORANGE)) { _ =>
GpuProjectExec.project(aggregated, postStepBound)
}
}
withResource(new NvtxRange("post-process", NvtxColor.ORANGE)) { _ =>
postStepBound.projectAndCloseWithRetrySingleBatch(aggregatedSpillable,
metrics.spillCallback)
}
SpillableColumnarBatch(
postProcessed,
Expand Down Expand Up @@ -441,7 +432,7 @@ object GpuHashAggregateIterator extends Arm with Logging {

// 3) a post-processing step required in some scenarios, casting or picking
// apart a struct
helper.postProcess(aggregatedSpillable)
helper.postProcess(aggregatedSpillable, metrics)
}
}

Expand Down
Loading