Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GpuRunningWindowExec to use OOM retry framework #8170

Merged
merged 31 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1f66965
stub out CheckpointRestore methods for fixers
andygrove Apr 20, 2023
d28d299
implement checkpoint/restore for BatchedRunningWindowFixer impls
andygrove Apr 21, 2023
6b32f1c
implement checkpoint/restore for BatchedRunningWindowFixer impls
andygrove Apr 21, 2023
aca91f1
retry around fixUpAll
andygrove Apr 21, 2023
84cb809
remove redundant class
andygrove Apr 21, 2023
32b1672
revert intellij auto formatting of imports
andygrove Apr 21, 2023
2c1e031
increase section of code contained within withRetryNoSplit
andygrove Apr 24, 2023
caa6970
add some comments
andygrove Apr 24, 2023
bb07cd6
minor cleanup
andygrove Apr 24, 2023
50c8218
save interim progress
andygrove Apr 26, 2023
64c36f2
save interim progress
andygrove Apr 26, 2023
d3bd074
close resources in checkpoint restore code
andygrove Apr 27, 2023
2485945
fix
andygrove Apr 27, 2023
22b54cf
add comment
andygrove Apr 27, 2023
b5cc792
remove retry from doAggsAndClose
andygrove Apr 27, 2023
7356e4b
move retry from doAgg to computeBasicWindow, fix test failures
andygrove Apr 28, 2023
8bb8dd4
fix double close, remove retry from computeBasicWindow
andygrove Apr 28, 2023
43c9f55
fix resource leak
andygrove Apr 28, 2023
662a543
remove comment that is no longer relevant
andygrove Apr 28, 2023
b1afce1
fix one resource leak
andygrove May 2, 2023
26c9a0a
Add retry to GpuWindowIterator.hasNext
andygrove May 3, 2023
003acd8
defensively reset checkpoints to None during restore
andygrove May 3, 2023
129bfe3
Merge remote-tracking branch 'nvidia/branch-23.06' into windows-relia…
andygrove May 4, 2023
9071cce
address feedback
andygrove May 5, 2023
9abd360
re-implement first unit test to use iterator
andygrove May 5, 2023
2f0de6e
re-implement unit tests to call GpuWindowIterator
andygrove May 5, 2023
c56fe60
fix error in test, add more assertions
andygrove May 5, 2023
189df7d
fix segfault
andygrove May 6, 2023
911e857
fix test
andygrove May 8, 2023
0c066ec
revert column order
andygrove May 8, 2023
02e2c47
remove TODO comment
andygrove May 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 81 additions & 75 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf
import ai.rapids.cudf.{AggregationOverWindow, DType, GroupByOptions, GroupByScanAggregation, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanAggregation, ScanType, Table, WindowOptions}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -1115,36 +1114,15 @@ class GroupedAggregations {
* Do all of the aggregations and put them in the output columns. There may be extra processing
* after this before you get to a final result.
*/
def doAggsAndClose(isRunningBatched: Boolean,
def doAggs(isRunningBatched: Boolean,
boundOrderSpec: Seq[SortOrder],
orderByPositions: Array[Int],
partByPositions: Array[Int],
inputSpillable: SpillableColumnarBatch,
inputCb: ColumnarBatch,
outputColumns: Array[cudf.ColumnVector]): Unit = {
withRetryNoSplit(inputSpillable) { attempt =>
// when there are exceptions in this body, we always want to close
// `outputColumns` before a likely retry.
try {
withResource(attempt.getColumnarBatch()) { attemptCb =>
doRunningWindowOptimizedAggs(
isRunningBatched, partByPositions, attemptCb, outputColumns)
doRowAggs(
boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns)
doRangeAggs(
boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns)
}
} catch {
case t: Throwable =>
// on exceptions we want to throw away any columns in outputColumns that
// are not pass-through
val columnsToClose = outputColumns.filter(_ != null)
outputColumns.indices.foreach { col =>
outputColumns(col) = null
}
columnsToClose.safeClose(t)
throw t
}
}
doRunningWindowOptimizedAggs(isRunningBatched, partByPositions, inputCb, outputColumns)
doRowAggs(boundOrderSpec, orderByPositions, partByPositions, inputCb, outputColumns)
doRangeAggs(boundOrderSpec, orderByPositions, partByPositions, inputCb, outputColumns)
}

/**
Expand Down Expand Up @@ -1241,18 +1219,16 @@ trait BasicWindowCalc {
*/
def computeBasicWindow(cb: ColumnarBatch): Array[cudf.ColumnVector] = {
closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns =>
val inputSpillable = SpillableColumnarBatch(
GpuProjectExec.project(cb, initialProjections),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)

// this takes ownership of `inputSpillable`
aggregations.doAggsAndClose(
isRunningBatched,
boundOrderSpec,
orderByPositions,
partByPositions,
inputSpillable,
outputColumns)

withResource(GpuProjectExec.project(cb, initialProjections)) { proj =>
aggregations.doAggs(
isRunningBatched,
boundOrderSpec,
orderByPositions,
partByPositions,
proj,
outputColumns)
}

// if the window aggregates were successful, lets splice the passThrough
// columns
Expand Down Expand Up @@ -1291,14 +1267,17 @@ class GpuWindowIterator(
override def hasNext: Boolean = input.hasNext

override def next(): ColumnarBatch = {
withResource(input.next()) { cb =>
withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ =>
val ret = withResource(computeBasicWindow(cb)) { cols =>
convertToBatch(outputTypes, cols)
val cbSpillable = SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY)
jbrennan333 marked this conversation as resolved.
Show resolved Hide resolved
withRetryNoSplit(cbSpillable) { _ =>
withResource(cbSpillable.getColumnarBatch()) { cb =>
withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ =>
jbrennan333 marked this conversation as resolved.
Show resolved Hide resolved
val ret = withResource(computeBasicWindow(cb)) { cols =>
convertToBatch(outputTypes, cols)
}
numOutputBatches += 1
numOutputRows += ret.numRows()
ret
}
numOutputBatches += 1
numOutputRows += ret.numRows()
ret
}
}
}
Expand Down Expand Up @@ -1483,33 +1462,60 @@ class GpuRunningWindowIterator(
}
}

def computeRunning(cb: ColumnarBatch): ColumnarBatch = {
def computeRunning(input: ColumnarBatch): ColumnarBatch = {
val fixers = fixerIndexMap
val numRows = cb.numRows()

withResource(computeBasicWindow(cb)) { basic =>
withResource(GpuProjectExec.project(cb, boundPartitionSpec)) { parts =>
val partColumns = GpuColumnVector.extractBases(parts)
withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual =>
val fixedUp = if (fixerNeedsOrderMask) {
withResource(GpuProjectExec.project(cb, boundOrderColumns)) { order =>
val orderColumns = GpuColumnVector.extractBases(order)
// We need to fix up the rows that are part of the same batch as the end of the
// last batch
withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) {
orderEqual =>
closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { fixed =>
saveLastOrder(getScalarRow(numRows - 1, orderColumns))
fixed
val numRows = input.numRows()
val cbSpillable = SpillableColumnarBatch(input, SpillPriorities.ACTIVE_BATCHING_PRIORITY)
jbrennan333 marked this conversation as resolved.
Show resolved Hide resolved
withRetryNoSplit(cbSpillable) { _ =>
withResource(cbSpillable.getColumnarBatch()) { cb =>
withResource(computeBasicWindow(cb)) { basic =>
var newOrder: Option[Array[Scalar]] = None
var newParts: Option[Array[Scalar]] = None
val fixedUp = try {
// we backup the fixers state and restore it in the event of a retry
withRestoreOnRetry(fixers.values.toSeq) {
withResource(GpuProjectExec.project(cb,
boundPartitionSpec)) { parts =>
val partColumns = GpuColumnVector.extractBases(parts)
withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual =>
val fixedUp = if (fixerNeedsOrderMask) {
withResource(GpuProjectExec.project(cb,
boundOrderColumns)) { order =>
val orderColumns = GpuColumnVector.extractBases(order)
// We need to fix up the rows that are part of the same batch as the end of
// the last batch
withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) {
orderEqual =>
closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) {
fixedUp =>
newOrder = Some(getScalarRow(numRows - 1, orderColumns))
fixedUp
}
}
}
} else {
// No ordering needed
fixUpAll(basic, fixers, partsEqual, None)
}
newParts = Some(getScalarRow(numRows - 1, partColumns))
fixedUp
}
}
}
} else {
// No ordering needed
fixUpAll(basic, fixers, partsEqual, None)
} catch {
case t: Throwable =>
// avoid leaking unused interim results
newOrder.foreach(_.foreach(_.close()))
newParts.foreach(_.foreach(_.close()))
throw t
}
// this section is outside of the retry logic because the calls to saveLastParts
// and saveLastOrders can potentially close GPU resources
withResource(fixedUp) { fixed =>
saveLastParts(getScalarRow(numRows - 1, partColumns))
closeOnExcept(fixedUp) { _ =>
jbrennan333 marked this conversation as resolved.
Show resolved Hide resolved
newOrder.foreach(saveLastOrder)
newParts.foreach(saveLastParts)
}
convertToBatch(outputTypes, fixed)
}
}
Expand Down Expand Up @@ -1544,13 +1550,13 @@ class GpuRunningWindowIterator(
}

override def next(): ColumnarBatch = {
withResource(readNextInputBatch()) { cb =>
withResource(new NvtxWithMetrics("RunningWindow", NvtxColor.CYAN, opTime)) { _ =>
val ret = computeRunning(cb)
numOutputBatches += 1
numOutputRows += ret.numRows()
ret
}
// TODO maybe should create spillable batch here before calling computeRunning
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove the TODO, what we have it fine, especially once the spill code changes are in so that making something spillable is super cheap.

val cb = readNextInputBatch()
withResource(new NvtxWithMetrics("RunningWindow", NvtxColor.CYAN, opTime)) { _ =>
val ret = computeRunning(cb) // takes ownership of cb
numOutputBatches += 1
numOutputRows += ret.numRows()
ret
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ trait GpuRunningWindowFunction extends GpuWindowFunction {
* </code>
* which can be output.
*/
trait BatchedRunningWindowFixer extends AutoCloseable {
trait BatchedRunningWindowFixer extends AutoCloseable with CheckpointRestore {
/**
* Fix up `windowedColumnOutput` with any stored state from previous batches.
* Like all window operations the input data will have been sorted by the partition
Expand Down Expand Up @@ -976,6 +976,26 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String)
extends BatchedRunningWindowFixer with Logging {
private var previousResult: Option[Scalar] = None

// checkpoint
private var checkpointResult: Option[Scalar] = None
jbrennan333 marked this conversation as resolved.
Show resolved Hide resolved

override def checkpoint(): Unit = {
checkpointResult = previousResult
}

override def restore(): Unit = {
if (checkpointResult.isDefined) {
// close previous result
previousResult match {
case Some(r) if r != checkpointResult.get =>
r.close()
case _ =>
}
previousResult = checkpointResult
checkpointResult = None
}
}

def getPreviousResult: Option[Scalar] = previousResult

def updateState(finalOutputColumn: cudf.ColumnVector): Unit = {
Expand Down Expand Up @@ -1025,6 +1045,38 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean)
private var previousResult: Option[Scalar] = None
private var previousOverflow: Option[Scalar] = None

// checkpoint
private var checkpointResult: Option[Scalar] = None
private var checkpointOverflow: Option[Scalar] = None

override def checkpoint(): Unit = {
checkpointOverflow = previousOverflow
checkpointResult = previousResult
}

override def restore(): Unit = {
if (checkpointOverflow.isDefined) {
// close previous result
previousOverflow match {
case Some(r) if r != checkpointOverflow.get =>
r.close()
case _ =>
}
previousOverflow = checkpointOverflow
checkpointOverflow = None
}
if (checkpointResult.isDefined) {
// close previous result
previousResult match {
case Some(r) if r != checkpointResult.get =>
r.close()
case _ =>
}
previousResult = checkpointResult
checkpointResult = None
}
}

def updateState(finalOutputColumn: cudf.ColumnVector,
wasOverflow: Option[cudf.ColumnVector]): Unit = {
val lastIndex = finalOutputColumn.getRowCount.toInt - 1
Expand Down Expand Up @@ -1255,6 +1307,28 @@ class RankFixer extends BatchedRunningWindowFixer with Logging {
// The previous rank value
private[this] var previousRank: Option[Scalar] = None

// checkpoint
private[this] var checkpointRank: Option[Scalar] = None

override def checkpoint(): Unit = {
rowNumFixer.checkpoint()
checkpointRank = previousRank
}

override def restore(): Unit = {
rowNumFixer.restore()
if (checkpointRank.isDefined) {
// close previous result
previousRank match {
case Some(r) if r != checkpointRank.get =>
r.close()
case _ =>
}
previousRank = checkpointRank
checkpointRank = None
}
}

override def needsOrderMask: Boolean = true

override def fixUp(
Expand Down Expand Up @@ -1353,6 +1427,26 @@ class DenseRankFixer extends BatchedRunningWindowFixer with Logging {

private var previousRank: Option[Scalar] = None

// checkpoint
private var checkpointRank: Option[Scalar] = None

override def checkpoint(): Unit = {
checkpointRank = previousRank
}

override def restore(): Unit = {
if (checkpointRank.isDefined) {
// close previous result
previousRank match {
case Some(r) if r != checkpointRank.get =>
r.close()
case _ =>
}
previousRank = checkpointRank
checkpointRank = None
}
}

override def needsOrderMask: Boolean = true

override def fixUp(
Expand Down
Loading