Skip to content

Commit

Permalink
Add in basic support for OOM retry for project and filter (#7864)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Mar 10, 2023
1 parent 693f3cb commit aef8306
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 153 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,8 +70,12 @@ public static synchronized void debug(String name, Table table) {
* @param cb the batch to print out.
*/
public static synchronized void debug(String name, ColumnarBatch cb) {
try (Table table = from(cb)) {
debug(name, table);
if (cb.numCols() <= 0) {
System.err.println("DEBUG " + name + " NO COLS " + cb.numRows() + " ROWS");
} else {
try (Table table = from(cb)) {
debug(name, table);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@ import ai.rapids.cudf.ast
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SortOrder}
import org.apache.spark.sql.rapids.catalyst.expressions.GpuEquivalentExpressions
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -130,19 +130,41 @@ object GpuBindReferences extends Logging {

/**
* A helper function to bind given expressions to an input schema where the expressions are
* to be processed on the GPU, and the result type indicates this. Common sub-expressions
* bound with their inputs are placed into a sequence of tiers in a GpuTieredProject object.
* to be processed on the GPU, and the result type indicates this. If runTiered is true
* Common sub-expressions will be factored out where possible to reduce the runtime and memory.
* If set to false a GpuTieredProject object will still be returned, but no common
* sub-expressions will be factored out.
*/
def bindGpuReferencesTiered[A <: Expression](
expressions: Seq[A],
input: AttributeSeq): GpuTieredProject = {

val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions)
val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input)
GpuTieredProject(exprTiers.zip(inputTiers).map {
case (es:Seq[Expression], is:AttributeSeq) =>
es.map(GpuBindReferences.bindGpuReference(_, is)).toList
}, inputTiers)
input: AttributeSeq,
runTiered: Boolean): GpuTieredProject = {

if (runTiered) {
val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions)
val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input)
// Update ExprTiers to include the columns that are pass through and drop unneeded columns
val newExprTiers = exprTiers.zipWithIndex.map {
case (exprTier, index) =>
// get what the output should look like.
val atInput = index + 1
if (atInput < inputTiers.length) {
inputTiers(atInput).attrs.map { attr =>
exprTier.find { expr =>
expr.asInstanceOf[NamedExpression].toAttribute == attr
}.getOrElse(attr)
}
} else {
exprTier
}
}
GpuTieredProject(newExprTiers.zip(inputTiers).map {
case (es: Seq[Expression], is: AttributeSeq) =>
es.map(GpuBindReferences.bindGpuReference(_, is)).toList
})
} else {
GpuTieredProject(Seq(GpuBindReferences.bindGpuReferences(expressions, input)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,25 @@ object SpillableColumnarBatch extends Arm {
}
}

private[this] def allFromSameBuffer(batch: ColumnarBatch): Boolean = {
var bufferAddr = 0L
var isSet = false
val numColumns = batch.numCols()
(0 until numColumns).forall { i =>
batch.column(i) match {
case fb: GpuColumnVectorFromBuffer =>
if (!isSet) {
bufferAddr = fb.getBuffer.getAddress
isSet = true
true
} else {
bufferAddr == fb.getBuffer.getAddress
}
case _ => false
}
}
}

private[this] def addBatch(
batch: ColumnarBatch,
initialSpillPriority: Long,
Expand All @@ -199,8 +218,7 @@ object SpillableColumnarBatch extends Arm {
initialSpillPriority,
spillCallback)
} else if (numColumns > 0 &&
(0 until numColumns)
.forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) {
allFromSameBuffer(batch)) {
val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer]
val buff = cv.getBuffer
RapidsBufferCatalog.addBuffer(buff, cv.getTableMeta, initialSpillPriority,
Expand Down
41 changes: 16 additions & 25 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -249,16 +249,12 @@ object GpuHashAggregateIterator extends Arm with Logging {
} else {
inputAttributes
}
private val (preStepBound, preStepBoundTiered) = if (useTieredProject) {
(None, Some(GpuBindReferences.bindGpuReferencesTiered(preStep.toList,
preStepAttributes.toList)))
} else {
(Some(GpuBindReferences.bindGpuReferences(preStep, preStepAttributes.toList)), None)
}
private val preStepBound = GpuBindReferences.bindGpuReferencesTiered(preStep.toList,
preStepAttributes.toList, useTieredProject)

// a bound expression that is applied after the cuDF aggregate
private val postStepBound =
GpuBindReferences.bindGpuReferences(postStep, postStepAttr)
private val postStepBound = GpuBindReferences.bindGpuReferencesTiered(postStep.toList,
postStepAttr.toList, useTieredProject)

/**
* Apply the "pre" step: preMerge for merge, or pass-through in the update case
Expand All @@ -269,14 +265,11 @@ object GpuHashAggregateIterator extends Arm with Logging {
def preProcess(
toAggregateBatch: ColumnarBatch,
metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = {
val projectedCb = withResource(toAggregateBatch) { inputBatch =>
withResource(new NvtxRange("pre-process", NvtxColor.DARK_GREEN)) { _ =>
if (useTieredProject) {
preStepBoundTiered.get.tieredProject(inputBatch)
} else {
GpuProjectExec.project(inputBatch, preStepBound.get)
}
}
val inputBatch = SpillableColumnarBatch(toAggregateBatch,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY, metrics.spillCallback)

val projectedCb = withResource(new NvtxRange("pre-process", NvtxColor.DARK_GREEN)) { _ =>
preStepBound.projectAndCloseWithRetrySingleBatch(inputBatch, metrics.spillCallback)
}
SpillableColumnarBatch(
projectedCb,
Expand Down Expand Up @@ -382,14 +375,12 @@ object GpuHashAggregateIterator extends Arm with Logging {
* @return output batch from the aggregate
*/
def postProcess(
aggregatedSpillable: SpillableColumnarBatch): SpillableColumnarBatch = {
aggregatedSpillable: SpillableColumnarBatch,
metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = {
val postProcessed =
withResource(aggregatedSpillable) { _ =>
withResource(aggregatedSpillable.getColumnarBatch()) { aggregated =>
withResource(new NvtxRange("post-process", NvtxColor.ORANGE)) { _ =>
GpuProjectExec.project(aggregated, postStepBound)
}
}
withResource(new NvtxRange("post-process", NvtxColor.ORANGE)) { _ =>
postStepBound.projectAndCloseWithRetrySingleBatch(aggregatedSpillable,
metrics.spillCallback)
}
SpillableColumnarBatch(
postProcessed,
Expand Down Expand Up @@ -441,7 +432,7 @@ object GpuHashAggregateIterator extends Arm with Logging {

// 3) a post-processing step required in some scenarios, casting or picking
// apart a struct
helper.postProcess(aggregatedSpillable)
helper.postProcess(aggregatedSpillable, metrics)
}
}

Expand Down
Loading

0 comments on commit aef8306

Please sign in to comment.