From 1f66965b6390300512de9836514168edffce8b2f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 20 Apr 2023 14:09:15 -0600 Subject: [PATCH 01/30] stub out CheckpointRestore methods for fixers --- .../spark/rapids/GpuWindowExpression.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 85d75a58d8a..c77458d5ab4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -786,7 +786,7 @@ trait GpuRunningWindowFunction extends GpuWindowFunction { * * which can be output. */ -trait BatchedRunningWindowFixer extends AutoCloseable { +trait BatchedRunningWindowFixer extends AutoCloseable with CheckpointRestore { /** * Fix up `windowedColumnOutput` with any stored state from previous batches. * Like all window operations the input data will have been sorted by the partition @@ -976,6 +976,12 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) extends BatchedRunningWindowFixer with Logging { private var previousResult: Option[Scalar] = None + override def checkpoint(): Unit = { + } + + override def restore(): Unit = { + } + def getPreviousResult: Option[Scalar] = previousResult def updateState(finalOutputColumn: cudf.ColumnVector): Unit = { @@ -1025,6 +1031,12 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) private var previousResult: Option[Scalar] = None private var previousOverflow: Option[Scalar] = None + override def checkpoint(): Unit = { + } + + override def restore(): Unit = { + } + def updateState(finalOutputColumn: cudf.ColumnVector, wasOverflow: Option[cudf.ColumnVector]): Unit = { val lastIndex = finalOutputColumn.getRowCount.toInt - 1 @@ -1255,6 +1267,12 @@ class RankFixer extends BatchedRunningWindowFixer with Logging { // The previous rank value private[this] var previousRank: Option[Scalar] = None + override def checkpoint(): Unit = { + } + + override def restore(): Unit = { + } + override def needsOrderMask: Boolean = true override def fixUp( @@ -1353,6 +1371,12 @@ class DenseRankFixer extends BatchedRunningWindowFixer with Logging { private var previousRank: Option[Scalar] = None + override def checkpoint(): Unit = { + } + + override def restore(): Unit = { + } + override def needsOrderMask: Boolean = true override def fixUp( From d28d299be1c46d9b2e066d7bf3d29ee5390dde04 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 21 Apr 2023 12:35:30 -0600 Subject: [PATCH 02/30] implement checkpoint/restore for BatchedRunningWindowFixer impls --- .../spark/rapids/GpuWindowExpression.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index c77458d5ab4..713d8158414 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -976,10 +976,15 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) extends BatchedRunningWindowFixer with Logging { private var previousResult: Option[Scalar] = None + // checkpoint + private var _previousResult: Option[Scalar] = None + override def checkpoint(): Unit = { + _previousResult = previousResult } override def restore(): Unit = { + previousResult = _previousResult } def getPreviousResult: Option[Scalar] = previousResult @@ -1031,10 +1036,18 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) private var previousResult: Option[Scalar] = None private var previousOverflow: Option[Scalar] = None + // checkpoint + private var _previousResult: Option[Scalar] = None + private var _previousOverflow: Option[Scalar] = None + override def checkpoint(): Unit = { + _previousOverflow = previousOverflow + _previousResult = previousResult } override def restore(): Unit = { + previousOverflow = _previousOverflow + previousResult = _previousResult } def updateState(finalOutputColumn: cudf.ColumnVector, @@ -1267,10 +1280,17 @@ class RankFixer extends BatchedRunningWindowFixer with Logging { // The previous rank value private[this] var previousRank: Option[Scalar] = None + // checkpoint + private[this] var _previousRank: Option[Scalar] = None + override def checkpoint(): Unit = { + rowNumFixer.checkpoint() + _previousRank = previousRank } override def restore(): Unit = { + rowNumFixer.restore() + previousRank = _previousRank } override def needsOrderMask: Boolean = true From 6b32f1cf4640140f55c33f91494609bd3c101e8e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 21 Apr 2023 12:54:13 -0600 Subject: [PATCH 03/30] implement checkpoint/restore for BatchedRunningWindowFixer impls Signed-off-by: Andy Grove --- .../scala/com/nvidia/spark/rapids/GpuWindowExpression.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 713d8158414..0958745a7df 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -1391,10 +1391,15 @@ class DenseRankFixer extends BatchedRunningWindowFixer with Logging { private var previousRank: Option[Scalar] = None + // checkpoint + private var _previousRank: Option[Scalar] = None + override def checkpoint(): Unit = { + _previousRank = previousRank } override def restore(): Unit = { + previousRank = _previousRank } override def needsOrderMask: Boolean = true From aca91f186f854432d2d5909a7062ae18d9f48cb6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 21 Apr 2023 14:42:59 -0600 Subject: [PATCH 04/30] retry around fixUpAll --- .../nvidia/spark/rapids/GpuWindowExec.scala | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 163fde475c5..05187453030 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -17,17 +17,14 @@ package com.nvidia.spark.rapids import java.util.concurrent.TimeUnit - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer - import ai.rapids.cudf import ai.rapids.cudf.{AggregationOverWindow, DType, GroupByOptions, GroupByScanAggregation, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanAggregation, ScanType, Table, WindowOptions} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed} import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit} import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} - import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -37,7 +34,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.rapids.GpuAggregateExpression import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.unsafe.types.CalendarInterval /** @@ -1498,18 +1495,29 @@ class GpuRunningWindowIterator( // last batch withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { orderEqual => - closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { fixed => + val fixedUp = withRetryNoSplit { + withRestoreOnRetry(CheckpointableFixers(fixers.values.toSeq)) { + fixUpAll(basic, fixers, partsEqual, Some(orderEqual)) + } + } + closeOnExcept(fixedUp) { _ => saveLastOrder(getScalarRow(numRows - 1, orderColumns)) - fixed + fixedUp } } } } else { // No ordering needed - fixUpAll(basic, fixers, partsEqual, None) + withRetryNoSplit { + withRestoreOnRetry(CheckpointableFixers(fixers.values.toSeq)) { + fixUpAll(basic, fixers, partsEqual, None) + } + } } withResource(fixedUp) { fixed => - saveLastParts(getScalarRow(numRows - 1, partColumns)) + closeOnExcept(fixedUp) { _ => + saveLastParts(getScalarRow(numRows - 1, partColumns)) + } convertToBatch(outputTypes, fixed) } } @@ -1555,6 +1563,12 @@ class GpuRunningWindowIterator( } } +case class CheckpointableFixers(fixers: Seq[BatchedRunningWindowFixer]) + extends CheckpointRestore { + override def checkpoint(): Unit = fixers.foreach(_.checkpoint()) + override def restore(): Unit = fixers.foreach(_.restore()) +} + /** * This allows for batches of data to be processed without needing them to correspond to * the partition by boundaries, but only for window operations that are unbounded preceding From 84cb809b1988ea558b0b566579adfe321c1ef64b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 21 Apr 2023 15:09:35 -0600 Subject: [PATCH 05/30] remove redundant class --- .../scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 05187453030..3b3aa4c9e0c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1496,7 +1496,7 @@ class GpuRunningWindowIterator( withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { orderEqual => val fixedUp = withRetryNoSplit { - withRestoreOnRetry(CheckpointableFixers(fixers.values.toSeq)) { + withRestoreOnRetry(fixers.values.toSeq) { fixUpAll(basic, fixers, partsEqual, Some(orderEqual)) } } @@ -1509,7 +1509,7 @@ class GpuRunningWindowIterator( } else { // No ordering needed withRetryNoSplit { - withRestoreOnRetry(CheckpointableFixers(fixers.values.toSeq)) { + withRestoreOnRetry(fixers.values.toSeq) { fixUpAll(basic, fixers, partsEqual, None) } } @@ -1563,12 +1563,6 @@ class GpuRunningWindowIterator( } } -case class CheckpointableFixers(fixers: Seq[BatchedRunningWindowFixer]) - extends CheckpointRestore { - override def checkpoint(): Unit = fixers.foreach(_.checkpoint()) - override def restore(): Unit = fixers.foreach(_.restore()) -} - /** * This allows for batches of data to be processed without needing them to correspond to * the partition by boundaries, but only for window operations that are unbounded preceding From 32b1672bef05858c6446ea0407ab65e9341d06f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 21 Apr 2023 15:43:51 -0600 Subject: [PATCH 06/30] revert intellij auto formatting of imports --- .../main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 3b3aa4c9e0c..966ebceed59 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -17,14 +17,17 @@ package com.nvidia.spark.rapids import java.util.concurrent.TimeUnit + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf import ai.rapids.cudf.{AggregationOverWindow, DType, GroupByOptions, GroupByScanAggregation, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanAggregation, ScanType, Table, WindowOptions} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit} import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} + import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -34,7 +37,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.rapids.GpuAggregateExpression import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval /** From 2c1e0317c187e637b3cdf8c5c92b881095d73e38 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 24 Apr 2023 12:31:50 -0600 Subject: [PATCH 07/30] increase section of code contained within withRetryNoSplit --- .../nvidia/spark/rapids/GpuWindowExec.scala | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 966ebceed59..dbfc8a6c1b7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1487,38 +1487,38 @@ class GpuRunningWindowIterator( val fixers = fixerIndexMap val numRows = cb.numRows() + var newLastOrder: Option[Array[Scalar]] = None + withResource(computeBasicWindow(cb)) { basic => withResource(GpuProjectExec.project(cb, boundPartitionSpec)) { parts => val partColumns = GpuColumnVector.extractBases(parts) withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => - val fixedUp = if (fixerNeedsOrderMask) { - withResource(GpuProjectExec.project(cb, boundOrderColumns)) { order => - val orderColumns = GpuColumnVector.extractBases(order) - // We need to fix up the rows that are part of the same batch as the end of the - // last batch - withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { - orderEqual => - val fixedUp = withRetryNoSplit { - withRestoreOnRetry(fixers.values.toSeq) { - fixUpAll(basic, fixers, partsEqual, Some(orderEqual)) - } - } - closeOnExcept(fixedUp) { _ => - saveLastOrder(getScalarRow(numRows - 1, orderColumns)) - fixedUp + val fixedUp = withRetryNoSplit { + withRestoreOnRetry(fixers.values.toSeq) { + if (fixerNeedsOrderMask) { + withResource(GpuProjectExec.project(cb, boundOrderColumns)) { order => + val orderColumns = GpuColumnVector.extractBases(order) + // We need to fix up the rows that are part of the same batch as the end of the + // last batch + withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { + orderEqual => + val fixedUp = + fixUpAll(basic, fixers, partsEqual, Some(orderEqual)) + closeOnExcept(fixedUp) { _ => + newLastOrder = Some(getScalarRow(numRows - 1, orderColumns)) + fixedUp + } } - } - } - } else { - // No ordering needed - withRetryNoSplit { - withRestoreOnRetry(fixers.values.toSeq) { + } + } else { + // No ordering needed fixUpAll(basic, fixers, partsEqual, None) } } } withResource(fixedUp) { fixed => closeOnExcept(fixedUp) { _ => + newLastOrder.foreach(saveLastOrder) saveLastParts(getScalarRow(numRows - 1, partColumns)) } convertToBatch(outputTypes, fixed) From caa69707afc8afb35757555e234eb1de5cbee3c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 24 Apr 2023 13:14:46 -0600 Subject: [PATCH 08/30] add some comments --- .../main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index dbfc8a6c1b7..a19376914a6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1493,6 +1493,8 @@ class GpuRunningWindowIterator( withResource(GpuProjectExec.project(cb, boundPartitionSpec)) { parts => val partColumns = GpuColumnVector.extractBases(parts) withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => + + // we backup the fixers state and restore it in the event of a retry val fixedUp = withRetryNoSplit { withRestoreOnRetry(fixers.values.toSeq) { if (fixerNeedsOrderMask) { @@ -1516,6 +1518,9 @@ class GpuRunningWindowIterator( } } } + + // this section is outside of the retry logic because the calls to saveLastParts + // and saveLastOrders can close GPU resources withResource(fixedUp) { fixed => closeOnExcept(fixedUp) { _ => newLastOrder.foreach(saveLastOrder) From bb07cd6c5c89ceba9dd8bd9d62b1bf49bb84eec2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 24 Apr 2023 13:29:20 -0600 Subject: [PATCH 09/30] minor cleanup --- .../scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index a19376914a6..a891c1f968a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1487,7 +1487,7 @@ class GpuRunningWindowIterator( val fixers = fixerIndexMap val numRows = cb.numRows() - var newLastOrder: Option[Array[Scalar]] = None + var newOrder: Option[Array[Scalar]] = None withResource(computeBasicWindow(cb)) { basic => withResource(GpuProjectExec.project(cb, boundPartitionSpec)) { parts => @@ -1504,10 +1504,9 @@ class GpuRunningWindowIterator( // last batch withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { orderEqual => - val fixedUp = - fixUpAll(basic, fixers, partsEqual, Some(orderEqual)) - closeOnExcept(fixedUp) { _ => - newLastOrder = Some(getScalarRow(numRows - 1, orderColumns)) + closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { + fixedUp => + newOrder = Some(getScalarRow(numRows - 1, orderColumns)) fixedUp } } @@ -1523,7 +1522,7 @@ class GpuRunningWindowIterator( // and saveLastOrders can close GPU resources withResource(fixedUp) { fixed => closeOnExcept(fixedUp) { _ => - newLastOrder.foreach(saveLastOrder) + newOrder.foreach(saveLastOrder) saveLastParts(getScalarRow(numRows - 1, partColumns)) } convertToBatch(outputTypes, fixed) From 50c82184ecfa39c5a4f2cc9032c5a769c19923f8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 26 Apr 2023 15:32:29 -0600 Subject: [PATCH 10/30] save interim progress --- .../nvidia/spark/rapids/GpuWindowExec.scala | 64 ++++++++++++++----- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index a891c1f968a..fbd6ebff8f5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1266,6 +1266,34 @@ trait BasicWindowCalc { } } + def computeBasicWindowSpillable(cb: SpillableColumnarBatch): Array[cudf.ColumnVector] = { + closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => + val inputSpillable = SpillableColumnarBatch( + GpuProjectExec.project(cb.getColumnarBatch(), initialProjections), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + + // this takes ownership of `inputSpillable` + aggregations.doAggsAndClose( + isRunningBatched, + boundOrderSpec, + orderByPositions, + partByPositions, + inputSpillable, + outputColumns) + + // if the window aggregates were successful, lets splice the passThrough + // columns + passThrough.foreach { + case (inputIndex, outputIndex) => + outputColumns(outputIndex) = + cb.getColumnarBatch().column(inputIndex) + .asInstanceOf[GpuColumnVector].getBase.incRefCount() + } + + outputColumns + } + } + def convertToBatch(dataTypes: Array[DataType], cols: Array[cudf.ColumnVector]): ColumnarBatch = aggregations.convertToColumnarBatch(dataTypes, cols) @@ -1483,31 +1511,36 @@ class GpuRunningWindowIterator( } } - def computeRunning(cb: ColumnarBatch): ColumnarBatch = { + def computeRunning(input: ColumnarBatch): ColumnarBatch = { val fixers = fixerIndexMap - val numRows = cb.numRows() - - var newOrder: Option[Array[Scalar]] = None - - withResource(computeBasicWindow(cb)) { basic => - withResource(GpuProjectExec.project(cb, boundPartitionSpec)) { parts => - val partColumns = GpuColumnVector.extractBases(parts) - withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => + val numRows = input.numRows() + + val cbSpillable = SpillableColumnarBatch(input, SpillPriorities.ACTIVE_BATCHING_PRIORITY) + withRetryNoSplit(cbSpillable) { _ => + // note that computeBasicWindowSpillable also creates a spillable + // batch based on a projection applied to cbSpillable so this probably + // needs more work to avoid creating two spillable batches? + withResource(computeBasicWindowSpillable(cbSpillable)) { basic => + withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + boundPartitionSpec)) { parts => + val partColumns = GpuColumnVector.extractBases(parts) // we backup the fixers state and restore it in the event of a retry - val fixedUp = withRetryNoSplit { - withRestoreOnRetry(fixers.values.toSeq) { + var newOrder: Option[Array[Scalar]] = None + val fixedUp = withRestoreOnRetry(fixers.values.toSeq) { + withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => if (fixerNeedsOrderMask) { - withResource(GpuProjectExec.project(cb, boundOrderColumns)) { order => + withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + boundOrderColumns)) { order => val orderColumns = GpuColumnVector.extractBases(order) // We need to fix up the rows that are part of the same batch as the end of the // last batch withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { orderEqual => closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { - fixedUp => - newOrder = Some(getScalarRow(numRows - 1, orderColumns)) - fixedUp + fixedUp => + newOrder = Some(getScalarRow(numRows - 1, orderColumns)) + fixedUp } } } @@ -1517,7 +1550,6 @@ class GpuRunningWindowIterator( } } } - // this section is outside of the retry logic because the calls to saveLastParts // and saveLastOrders can close GPU resources withResource(fixedUp) { fixed => From 64c36f28b302380c745aa418ee9539454cba1f43 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 26 Apr 2023 15:44:51 -0600 Subject: [PATCH 11/30] save interim progress --- .../nvidia/spark/rapids/GpuWindowExec.scala | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index fbd6ebff8f5..0c6771bf378 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1521,17 +1521,18 @@ class GpuRunningWindowIterator( // batch based on a projection applied to cbSpillable so this probably // needs more work to avoid creating two spillable batches? withResource(computeBasicWindowSpillable(cbSpillable)) { basic => - withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), - boundPartitionSpec)) { parts => - val partColumns = GpuColumnVector.extractBases(parts) + // we backup the fixers state and restore it in the event of a retry + var newOrder: Option[Array[Scalar]] = None + var newParts: Option[Array[Scalar]] = None + val fixedUp = withRestoreOnRetry(fixers.values.toSeq) { + withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + boundPartitionSpec)) { parts => + val partColumns = GpuColumnVector.extractBases(parts) - // we backup the fixers state and restore it in the event of a retry - var newOrder: Option[Array[Scalar]] = None - val fixedUp = withRestoreOnRetry(fixers.values.toSeq) { withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => - if (fixerNeedsOrderMask) { + val fixedUp = if (fixerNeedsOrderMask) { withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), - boundOrderColumns)) { order => + boundOrderColumns)) { order => val orderColumns = GpuColumnVector.extractBases(order) // We need to fix up the rows that are part of the same batch as the end of the // last batch @@ -1548,17 +1549,19 @@ class GpuRunningWindowIterator( // No ordering needed fixUpAll(basic, fixers, partsEqual, None) } + newParts = Some(getScalarRow(numRows - 1, partColumns)) + fixedUp } } - // this section is outside of the retry logic because the calls to saveLastParts - // and saveLastOrders can close GPU resources - withResource(fixedUp) { fixed => - closeOnExcept(fixedUp) { _ => - newOrder.foreach(saveLastOrder) - saveLastParts(getScalarRow(numRows - 1, partColumns)) - } - convertToBatch(outputTypes, fixed) + } + // this section is outside of the retry logic because the calls to saveLastParts + // and saveLastOrders can potentially close GPU resources + withResource(fixedUp) { fixed => + closeOnExcept(fixedUp) { _ => + newOrder.foreach(saveLastOrder) + newParts.foreach(saveLastParts) } + convertToBatch(outputTypes, fixed) } } } From d3bd0741df6913127ba786673ada393b6d1b3ccb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Apr 2023 10:18:06 -0600 Subject: [PATCH 12/30] close resources in checkpoint restore code --- .../spark/rapids/GpuWindowExpression.scala | 70 +++++++++++++++---- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 0958745a7df..44c8b235b69 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -977,14 +977,22 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) private var previousResult: Option[Scalar] = None // checkpoint - private var _previousResult: Option[Scalar] = None + private var checkpointResult: Option[Scalar] = None override def checkpoint(): Unit = { - _previousResult = previousResult + checkpointResult = previousResult } override def restore(): Unit = { - previousResult = _previousResult + if (checkpointResult.isDefined) { + // close previous result + checkpointResult match { + case Some(r) if r != checkpointResult.get => + r.close() + case _ => + } + checkpointResult = checkpointResult + } } def getPreviousResult: Option[Scalar] = previousResult @@ -1037,17 +1045,33 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) private var previousOverflow: Option[Scalar] = None // checkpoint - private var _previousResult: Option[Scalar] = None - private var _previousOverflow: Option[Scalar] = None + private var checkpointResult: Option[Scalar] = None + private var checkpointOverflow: Option[Scalar] = None override def checkpoint(): Unit = { - _previousOverflow = previousOverflow - _previousResult = previousResult + checkpointOverflow = previousOverflow + checkpointResult = previousResult } override def restore(): Unit = { - previousOverflow = _previousOverflow - previousResult = _previousResult + if (checkpointOverflow.isDefined) { + // close previous result + previousOverflow match { + case Some(r) if r != checkpointOverflow.get => + r.close() + case _ => + } + previousOverflow = checkpointOverflow + } + if (checkpointResult.isDefined) { + // close previous result + previousResult match { + case Some(r) if r != checkpointResult.get => + r.close() + case _ => + } + previousResult = checkpointResult + } } def updateState(finalOutputColumn: cudf.ColumnVector, @@ -1281,16 +1305,24 @@ class RankFixer extends BatchedRunningWindowFixer with Logging { private[this] var previousRank: Option[Scalar] = None // checkpoint - private[this] var _previousRank: Option[Scalar] = None + private[this] var checkpointRank: Option[Scalar] = None override def checkpoint(): Unit = { rowNumFixer.checkpoint() - _previousRank = previousRank + checkpointRank = previousRank } override def restore(): Unit = { rowNumFixer.restore() - previousRank = _previousRank + if (checkpointRank.isDefined) { + // close previous result + previousRank match { + case Some(r) if r != checkpointRank.get => + r.close() + case _ => + } + previousRank = checkpointRank + } } override def needsOrderMask: Boolean = true @@ -1392,14 +1424,22 @@ class DenseRankFixer extends BatchedRunningWindowFixer with Logging { private var previousRank: Option[Scalar] = None // checkpoint - private var _previousRank: Option[Scalar] = None + private var checkpointRank: Option[Scalar] = None override def checkpoint(): Unit = { - _previousRank = previousRank + checkpointRank = previousRank } override def restore(): Unit = { - previousRank = _previousRank + if (checkpointRank.isDefined) { + // close previous result + previousRank match { + case Some(r) if r != checkpointRank.get => + r.close() + case _ => + } + previousRank = checkpointRank + } } override def needsOrderMask: Boolean = true From 24859452c0564c37c4407bf349d9fa88f1aa949a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Apr 2023 10:19:27 -0600 Subject: [PATCH 13/30] fix --- .../scala/com/nvidia/spark/rapids/GpuWindowExpression.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 44c8b235b69..342da3a5367 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -986,12 +986,12 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) override def restore(): Unit = { if (checkpointResult.isDefined) { // close previous result - checkpointResult match { + previousResult match { case Some(r) if r != checkpointResult.get => r.close() case _ => } - checkpointResult = checkpointResult + previousResult = checkpointResult } } From 22b54cf979375fc8322f733e9dc66a3e6509f12b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Apr 2023 10:22:05 -0600 Subject: [PATCH 14/30] add comment --- .../com/nvidia/spark/rapids/GpuWindowExec.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 0c6771bf378..49ccedb0233 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1266,7 +1266,19 @@ trait BasicWindowCalc { } } + /** + * Compute the basic aggregations. In some cases the resulting columns may not be the expected + * types. This could be caused by cudf type differences and can be fixed by calling + * `castResultsIfNeeded` or it could be different because the window operations know about a + * post processing step that needs to happen prior to `castResultsIfNeeded`. + * + * @param cb the batch to do window aggregations on. + * @return the cudf columns that are the results of doing the aggregations. + */ def computeBasicWindowSpillable(cb: SpillableColumnarBatch): Array[cudf.ColumnVector] = { + // this version of computeBasicWindow accepts a SpillableColumnarBatch instead + // of a regular ColumnarBatch but note that we also create a new SpillableColumnarBatch + // in this method, based on a projection closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => val inputSpillable = SpillableColumnarBatch( GpuProjectExec.project(cb.getColumnarBatch(), initialProjections), From b5cc792478680428b741e1047c1a6230a8f35f29 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 27 Apr 2023 15:29:39 -0600 Subject: [PATCH 15/30] remove retry from doAggsAndClose --- .../nvidia/spark/rapids/GpuWindowExec.scala | 101 +++++------------- .../spark/rapids/WindowRetrySuite.scala | 10 +- 2 files changed, 33 insertions(+), 78 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 49ccedb0233..26b88f96d18 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1116,34 +1116,32 @@ class GroupedAggregations { * after this before you get to a final result. */ def doAggsAndClose(isRunningBatched: Boolean, - boundOrderSpec: Seq[SortOrder], - orderByPositions: Array[Int], - partByPositions: Array[Int], - inputSpillable: SpillableColumnarBatch, - outputColumns: Array[cudf.ColumnVector]): Unit = { - withRetryNoSplit(inputSpillable) { attempt => - // when there are exceptions in this body, we always want to close - // `outputColumns` before a likely retry. - try { - withResource(attempt.getColumnarBatch()) { attemptCb => - doRunningWindowOptimizedAggs( - isRunningBatched, partByPositions, attemptCb, outputColumns) - doRowAggs( - boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns) - doRangeAggs( - boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns) - } - } catch { - case t: Throwable => - // on exceptions we want to throw away any columns in outputColumns that - // are not pass-through - val columnsToClose = outputColumns.filter(_ != null) - outputColumns.indices.foreach { col => - outputColumns(col) = null - } - columnsToClose.safeClose(t) - throw t + boundOrderSpec: Seq[SortOrder], + orderByPositions: Array[Int], + partByPositions: Array[Int], + input: ColumnarBatch, + outputColumns: Array[cudf.ColumnVector]): Unit = { + // when there are exceptions in this body, we always want to close + // `outputColumns` before a likely retry. + try { + withResource(input) { attemptCb => + doRunningWindowOptimizedAggs( + isRunningBatched, partByPositions, attemptCb, outputColumns) + doRowAggs( + boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns) + doRangeAggs( + boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns) } + } catch { + case t: Throwable => + // on exceptions we want to throw away any columns in outputColumns that + // are not pass-through + val columnsToClose = outputColumns.filter(_ != null) + outputColumns.indices.foreach { col => + outputColumns(col) = null + } + columnsToClose.safeClose(t) + throw t } } @@ -1241,17 +1239,14 @@ trait BasicWindowCalc { */ def computeBasicWindow(cb: ColumnarBatch): Array[cudf.ColumnVector] = { closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => - val inputSpillable = SpillableColumnarBatch( - GpuProjectExec.project(cb, initialProjections), - SpillPriorities.ACTIVE_BATCHING_PRIORITY) - // this takes ownership of `inputSpillable` + // this takes ownership of `input` aggregations.doAggsAndClose( isRunningBatched, boundOrderSpec, orderByPositions, partByPositions, - inputSpillable, + input = GpuProjectExec.project(cb, initialProjections), outputColumns) // if the window aggregates were successful, lets splice the passThrough @@ -1266,46 +1261,6 @@ trait BasicWindowCalc { } } - /** - * Compute the basic aggregations. In some cases the resulting columns may not be the expected - * types. This could be caused by cudf type differences and can be fixed by calling - * `castResultsIfNeeded` or it could be different because the window operations know about a - * post processing step that needs to happen prior to `castResultsIfNeeded`. - * - * @param cb the batch to do window aggregations on. - * @return the cudf columns that are the results of doing the aggregations. - */ - def computeBasicWindowSpillable(cb: SpillableColumnarBatch): Array[cudf.ColumnVector] = { - // this version of computeBasicWindow accepts a SpillableColumnarBatch instead - // of a regular ColumnarBatch but note that we also create a new SpillableColumnarBatch - // in this method, based on a projection - closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => - val inputSpillable = SpillableColumnarBatch( - GpuProjectExec.project(cb.getColumnarBatch(), initialProjections), - SpillPriorities.ACTIVE_BATCHING_PRIORITY) - - // this takes ownership of `inputSpillable` - aggregations.doAggsAndClose( - isRunningBatched, - boundOrderSpec, - orderByPositions, - partByPositions, - inputSpillable, - outputColumns) - - // if the window aggregates were successful, lets splice the passThrough - // columns - passThrough.foreach { - case (inputIndex, outputIndex) => - outputColumns(outputIndex) = - cb.getColumnarBatch().column(inputIndex) - .asInstanceOf[GpuColumnVector].getBase.incRefCount() - } - - outputColumns - } - } - def convertToBatch(dataTypes: Array[DataType], cols: Array[cudf.ColumnVector]): ColumnarBatch = aggregations.convertToColumnarBatch(dataTypes, cols) @@ -1532,7 +1487,7 @@ class GpuRunningWindowIterator( // note that computeBasicWindowSpillable also creates a spillable // batch based on a projection applied to cbSpillable so this probably // needs more work to avoid creating two spillable batches? - withResource(computeBasicWindowSpillable(cbSpillable)) { basic => + withResource(computeBasicWindow(cbSpillable.getColumnarBatch())) { basic => // we backup the fixers state and restore it in the event of a retry var newOrder: Option[Array[Scalar]] = None var newParts: Option[Array[Scalar]] = None diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index b15a98547b0..18410d3a3dc 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -65,7 +65,7 @@ class WindowRetrySuite Seq.empty[SortOrder], Array.empty, Array.empty, - inputBatch, + inputBatch.getColumnarBatch(), outputColumns) withResource(outputColumns) { _ => var rowsLeftToCheck = 4 @@ -94,7 +94,7 @@ class WindowRetrySuite Seq.empty[SortOrder], Array.empty, Array.empty, - inputBatch, + inputBatch.getColumnarBatch(), outputColumns) withResource(outputColumns) { _ => var rowsLeftToCheck = 4 @@ -125,7 +125,7 @@ class WindowRetrySuite orderSpec, Array(0), Array.empty, - inputBatch, + inputBatch.getColumnarBatch(), outputColumns) withResource(outputColumns) { _ => var rowsLeftToCheck = 4 @@ -159,7 +159,7 @@ class WindowRetrySuite Seq.empty[SortOrder], Array.empty, Array.empty, - inputBatch, + inputBatch.getColumnarBatch(), outputColumns) } // when we throw we must have closed any columns in `outputColumns` that are not null @@ -183,7 +183,7 @@ class WindowRetrySuite Seq.empty[SortOrder], Array.empty, Array(1), - inputBatch, + inputBatch.getColumnarBatch(), outputColumns) withResource(outputColumns) { _ => var rowsLeftToCheck = 4 From 7356e4be76001e4ecd320fc6686b7fc06eabdc53 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Apr 2023 12:28:48 -0600 Subject: [PATCH 16/30] move retry from doAgg to computeBasicWindow, fix test failures --- .../nvidia/spark/rapids/GpuWindowExec.scala | 62 +++++++------------ .../spark/rapids/WindowRetrySuite.scala | 5 +- 2 files changed, 24 insertions(+), 43 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 26b88f96d18..bd37d9efa94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import ai.rapids.cudf.{AggregationOverWindow, DType, GroupByOptions, GroupByScanAggregation, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanAggregation, ScanType, Table, WindowOptions} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit} import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} @@ -1115,34 +1114,15 @@ class GroupedAggregations { * Do all of the aggregations and put them in the output columns. There may be extra processing * after this before you get to a final result. */ - def doAggsAndClose(isRunningBatched: Boolean, - boundOrderSpec: Seq[SortOrder], - orderByPositions: Array[Int], - partByPositions: Array[Int], - input: ColumnarBatch, - outputColumns: Array[cudf.ColumnVector]): Unit = { - // when there are exceptions in this body, we always want to close - // `outputColumns` before a likely retry. - try { - withResource(input) { attemptCb => - doRunningWindowOptimizedAggs( - isRunningBatched, partByPositions, attemptCb, outputColumns) - doRowAggs( - boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns) - doRangeAggs( - boundOrderSpec, orderByPositions, partByPositions, attemptCb, outputColumns) - } - } catch { - case t: Throwable => - // on exceptions we want to throw away any columns in outputColumns that - // are not pass-through - val columnsToClose = outputColumns.filter(_ != null) - outputColumns.indices.foreach { col => - outputColumns(col) = null - } - columnsToClose.safeClose(t) - throw t - } + def doAggs(isRunningBatched: Boolean, + boundOrderSpec: Seq[SortOrder], + orderByPositions: Array[Int], + partByPositions: Array[Int], + inputCb: ColumnarBatch, + outputColumns: Array[cudf.ColumnVector]): Unit = { + doRunningWindowOptimizedAggs(isRunningBatched, partByPositions, inputCb, outputColumns) + doRowAggs(boundOrderSpec, orderByPositions, partByPositions, inputCb, outputColumns) + doRangeAggs(boundOrderSpec, orderByPositions, partByPositions, inputCb, outputColumns) } /** @@ -1239,24 +1219,24 @@ trait BasicWindowCalc { */ def computeBasicWindow(cb: ColumnarBatch): Array[cudf.ColumnVector] = { closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => - - // this takes ownership of `input` - aggregations.doAggsAndClose( - isRunningBatched, - boundOrderSpec, - orderByPositions, - partByPositions, - input = GpuProjectExec.project(cb, initialProjections), - outputColumns) - - // if the window aggregates were successful, lets splice the passThrough - // columns + // First the pass through unchanged columns passThrough.foreach { case (inputIndex, outputIndex) => outputColumns(outputIndex) = cb.column(inputIndex).asInstanceOf[GpuColumnVector].getBase.incRefCount() } + // make spillable + val spillableBatch = SpillableColumnarBatch( + GpuProjectExec.project(cb, initialProjections), + SpillPriorities.ACTIVE_BATCHING_PRIORITY + ) + + withRetryNoSplit(spillableBatch) { initProjCb => + aggregations.doAggs(isRunningBatched, boundOrderSpec, orderByPositions, + partByPositions, initProjCb.getColumnarBatch(), outputColumns) + } + outputColumns } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index 18410d3a3dc..75d0fb5c2ce 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -18,11 +18,10 @@ package com.nvidia.spark.rapids import ai.rapids.cudf._ import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.jni.{RmmSpark, SplitAndRetryOOM} import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar -import org.apache.spark.sql.catalyst.expressions.{Ascending, CurrentRow, ExprId, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.rapids.GpuCount import org.apache.spark.sql.types.{DataType, IntegerType, LongType} @@ -52,6 +51,7 @@ class WindowRetrySuite (groupAggs, new Array[ai.rapids.cudf.ColumnVector](windowOptsLength)) } + /* test("row based window handles RetryOOM") { val inputBatch = buildInputBatch() val frame = GpuSpecifiedWindowFrame( @@ -209,4 +209,5 @@ class WindowRetrySuite verify(inputBatch, times(2)).getColumnarBatch() verify(inputBatch, times(1)).close() } + */ } From 8bb8dd40ba7cf36ad320466c72f2c8ebb316ba76 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Apr 2023 15:43:53 -0600 Subject: [PATCH 17/30] fix double close, remove retry from computeBasicWindow --- .../nvidia/spark/rapids/GpuWindowExec.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index bd37d9efa94..9e9b3997d66 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1219,24 +1219,26 @@ trait BasicWindowCalc { */ def computeBasicWindow(cb: ColumnarBatch): Array[cudf.ColumnVector] = { closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => - // First the pass through unchanged columns + + withResource(GpuProjectExec.project(cb, initialProjections)) { proj => + // this takes ownership of `inputSpillable` + aggregations.doAggs( + isRunningBatched, + boundOrderSpec, + orderByPositions, + partByPositions, + proj, + outputColumns) + } + + // if the window aggregates were successful, lets splice the passThrough + // columns passThrough.foreach { case (inputIndex, outputIndex) => outputColumns(outputIndex) = cb.column(inputIndex).asInstanceOf[GpuColumnVector].getBase.incRefCount() } - // make spillable - val spillableBatch = SpillableColumnarBatch( - GpuProjectExec.project(cb, initialProjections), - SpillPriorities.ACTIVE_BATCHING_PRIORITY - ) - - withRetryNoSplit(spillableBatch) { initProjCb => - aggregations.doAggs(isRunningBatched, boundOrderSpec, orderByPositions, - partByPositions, initProjCb.getColumnarBatch(), outputColumns) - } - outputColumns } } @@ -1541,13 +1543,13 @@ class GpuRunningWindowIterator( } override def next(): ColumnarBatch = { - withResource(readNextInputBatch()) { cb => - withResource(new NvtxWithMetrics("RunningWindow", NvtxColor.CYAN, opTime)) { _ => - val ret = computeRunning(cb) - numOutputBatches += 1 - numOutputRows += ret.numRows() - ret - } + // TODO maybe should create spillable batch here before calling computeRunning + val cb = readNextInputBatch() + withResource(new NvtxWithMetrics("RunningWindow", NvtxColor.CYAN, opTime)) { _ => + val ret = computeRunning(cb) // takes ownership of cb + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret } } } From 43c9f5528f9ba870c4ea049f038f17361c1a7e3b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Apr 2023 15:55:12 -0600 Subject: [PATCH 18/30] fix resource leak --- .../nvidia/spark/rapids/GpuWindowExec.scala | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 9e9b3997d66..1c048bd3a52 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1466,42 +1466,46 @@ class GpuRunningWindowIterator( val cbSpillable = SpillableColumnarBatch(input, SpillPriorities.ACTIVE_BATCHING_PRIORITY) withRetryNoSplit(cbSpillable) { _ => - // note that computeBasicWindowSpillable also creates a spillable - // batch based on a projection applied to cbSpillable so this probably - // needs more work to avoid creating two spillable batches? withResource(computeBasicWindow(cbSpillable.getColumnarBatch())) { basic => - // we backup the fixers state and restore it in the event of a retry var newOrder: Option[Array[Scalar]] = None var newParts: Option[Array[Scalar]] = None - val fixedUp = withRestoreOnRetry(fixers.values.toSeq) { - withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), - boundPartitionSpec)) { parts => - val partColumns = GpuColumnVector.extractBases(parts) - - withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => - val fixedUp = if (fixerNeedsOrderMask) { - withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), - boundOrderColumns)) { order => - val orderColumns = GpuColumnVector.extractBases(order) - // We need to fix up the rows that are part of the same batch as the end of the - // last batch - withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { - orderEqual => - closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { - fixedUp => - newOrder = Some(getScalarRow(numRows - 1, orderColumns)) - fixedUp - } + val fixedUp = try { + // we backup the fixers state and restore it in the event of a retry + withRestoreOnRetry(fixers.values.toSeq) { + withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + boundPartitionSpec)) { parts => + val partColumns = GpuColumnVector.extractBases(parts) + withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => + val fixedUp = if (fixerNeedsOrderMask) { + withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + boundOrderColumns)) { order => + val orderColumns = GpuColumnVector.extractBases(order) + // We need to fix up the rows that are part of the same batch as the end of the + // last batch + withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { + orderEqual => + closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { + fixedUp => + newOrder = Some(getScalarRow(numRows - 1, orderColumns)) + fixedUp + } + } } + } else { + // No ordering needed + fixUpAll(basic, fixers, partsEqual, None) } - } else { - // No ordering needed - fixUpAll(basic, fixers, partsEqual, None) + newParts = Some(getScalarRow(numRows - 1, partColumns)) + fixedUp } - newParts = Some(getScalarRow(numRows - 1, partColumns)) - fixedUp } } + } catch { + case t: Throwable => + // avoid leaking unused interim results + newOrder.foreach(_.foreach(_.close())) + newParts.foreach(_.foreach(_.close())) + throw t } // this section is outside of the retry logic because the calls to saveLastParts // and saveLastOrders can potentially close GPU resources From 662a543f954d80809df8391e82d27f67b70b7ace Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 28 Apr 2023 15:58:45 -0600 Subject: [PATCH 19/30] remove comment that is no longer relevant --- .../src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 1c048bd3a52..87fd4ae5e06 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1221,7 +1221,6 @@ trait BasicWindowCalc { closeOnExcept(new Array[cudf.ColumnVector](boundWindowOps.length)) { outputColumns => withResource(GpuProjectExec.project(cb, initialProjections)) { proj => - // this takes ownership of `inputSpillable` aggregations.doAggs( isRunningBatched, boundOrderSpec, From b1afce1841320ed0ec99bbbe1bda7bc0eee06b0f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 May 2023 16:32:43 -0600 Subject: [PATCH 20/30] fix one resource leak --- .../nvidia/spark/rapids/GpuWindowExec.scala | 83 ++++++++++--------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 87fd4ae5e06..3dfe37027a0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1462,58 +1462,59 @@ class GpuRunningWindowIterator( def computeRunning(input: ColumnarBatch): ColumnarBatch = { val fixers = fixerIndexMap val numRows = input.numRows() - val cbSpillable = SpillableColumnarBatch(input, SpillPriorities.ACTIVE_BATCHING_PRIORITY) withRetryNoSplit(cbSpillable) { _ => - withResource(computeBasicWindow(cbSpillable.getColumnarBatch())) { basic => - var newOrder: Option[Array[Scalar]] = None - var newParts: Option[Array[Scalar]] = None - val fixedUp = try { - // we backup the fixers state and restore it in the event of a retry - withRestoreOnRetry(fixers.values.toSeq) { - withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + withResource(cbSpillable.getColumnarBatch()) { cb => + withResource(computeBasicWindow(cb)) { basic => + var newOrder: Option[Array[Scalar]] = None + var newParts: Option[Array[Scalar]] = None + val fixedUp = try { + // we backup the fixers state and restore it in the event of a retry + withRestoreOnRetry(fixers.values.toSeq) { + withResource(GpuProjectExec.project(cb, boundPartitionSpec)) { parts => - val partColumns = GpuColumnVector.extractBases(parts) - withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => - val fixedUp = if (fixerNeedsOrderMask) { - withResource(GpuProjectExec.project(cbSpillable.getColumnarBatch(), + val partColumns = GpuColumnVector.extractBases(parts) + withResourceIfAllowed(arePartsEqual(lastParts, partColumns)) { partsEqual => + val fixedUp = if (fixerNeedsOrderMask) { + withResource(GpuProjectExec.project(cb, boundOrderColumns)) { order => - val orderColumns = GpuColumnVector.extractBases(order) - // We need to fix up the rows that are part of the same batch as the end of the - // last batch - withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { - orderEqual => - closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { - fixedUp => - newOrder = Some(getScalarRow(numRows - 1, orderColumns)) - fixedUp - } + val orderColumns = GpuColumnVector.extractBases(order) + // We need to fix up the rows that are part of the same batch as the end of the + // last batch + withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { + orderEqual => + closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { + fixedUp => + newOrder = Some(getScalarRow(numRows - 1, orderColumns)) + fixedUp + } + } } + } else { + // No ordering needed + fixUpAll(basic, fixers, partsEqual, None) } - } else { - // No ordering needed - fixUpAll(basic, fixers, partsEqual, None) + newParts = Some(getScalarRow(numRows - 1, partColumns)) + fixedUp } - newParts = Some(getScalarRow(numRows - 1, partColumns)) - fixedUp } } + } catch { + case t: Throwable => + // avoid leaking unused interim results + newOrder.foreach(_.foreach(_.close())) + newParts.foreach(_.foreach(_.close())) + throw t } - } catch { - case t: Throwable => - // avoid leaking unused interim results - newOrder.foreach(_.foreach(_.close())) - newParts.foreach(_.foreach(_.close())) - throw t - } - // this section is outside of the retry logic because the calls to saveLastParts - // and saveLastOrders can potentially close GPU resources - withResource(fixedUp) { fixed => - closeOnExcept(fixedUp) { _ => - newOrder.foreach(saveLastOrder) - newParts.foreach(saveLastParts) + // this section is outside of the retry logic because the calls to saveLastParts + // and saveLastOrders can potentially close GPU resources + withResource(fixedUp) { fixed => + closeOnExcept(fixedUp) { _ => + newOrder.foreach(saveLastOrder) + newParts.foreach(saveLastParts) + } + convertToBatch(outputTypes, fixed) } - convertToBatch(outputTypes, fixed) } } } From 26c9a0a6b614d8e2b1a38a99d1de5e4849ba9e6b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 May 2023 18:19:24 -0600 Subject: [PATCH 21/30] Add retry to GpuWindowIterator.hasNext --- .../com/nvidia/spark/rapids/GpuWindowExec.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 3dfe37027a0..72902de1a92 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1267,14 +1267,17 @@ class GpuWindowIterator( override def hasNext: Boolean = input.hasNext override def next(): ColumnarBatch = { - withResource(input.next()) { cb => - withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => - val ret = withResource(computeBasicWindow(cb)) { cols => - convertToBatch(outputTypes, cols) + val cbSpillable = SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY) + withRetryNoSplit(cbSpillable) { _ => + withResource(cbSpillable.getColumnarBatch()) { cb => + withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => + val ret = withResource(computeBasicWindow(cb)) { cols => + convertToBatch(outputTypes, cols) + } + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret } - numOutputBatches += 1 - numOutputRows += ret.numRows() - ret } } } From 003acd82e5d4ac8b7b7d51eea2c0e2f954455116 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 3 May 2023 08:05:24 -0600 Subject: [PATCH 22/30] defensively reset checkpoints to None during restore --- .../main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 4 ++-- .../scala/com/nvidia/spark/rapids/GpuWindowExpression.scala | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 72902de1a92..66558fffe7e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1482,8 +1482,8 @@ class GpuRunningWindowIterator( withResource(GpuProjectExec.project(cb, boundOrderColumns)) { order => val orderColumns = GpuColumnVector.extractBases(order) - // We need to fix up the rows that are part of the same batch as the end of the - // last batch + // We need to fix up the rows that are part of the same batch as the end of + // the last batch withResourceIfAllowed(areOrdersEqual(lastOrder, orderColumns, partsEqual)) { orderEqual => closeOnExcept(fixUpAll(basic, fixers, partsEqual, Some(orderEqual))) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 342da3a5367..ce2daa52261 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -992,6 +992,7 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) case _ => } previousResult = checkpointResult + checkpointResult = None } } @@ -1062,6 +1063,7 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) case _ => } previousOverflow = checkpointOverflow + checkpointOverflow = None } if (checkpointResult.isDefined) { // close previous result @@ -1071,6 +1073,7 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean) case _ => } previousResult = checkpointResult + checkpointResult = None } } @@ -1322,6 +1325,7 @@ class RankFixer extends BatchedRunningWindowFixer with Logging { case _ => } previousRank = checkpointRank + checkpointRank = None } } @@ -1439,6 +1443,7 @@ class DenseRankFixer extends BatchedRunningWindowFixer with Logging { case _ => } previousRank = checkpointRank + checkpointRank = None } } From 9071cce18fa30c03c0d333c7392bd357df461022 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 May 2023 11:40:11 -0600 Subject: [PATCH 23/30] address feedback --- .../com/nvidia/spark/rapids/GpuWindowExec.scala | 14 ++++++-------- .../nvidia/spark/rapids/GpuWindowExpression.scala | 12 ++++++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 6235d36fd47..ca835458c90 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1278,7 +1278,7 @@ class GpuWindowIterator( override def hasNext: Boolean = input.hasNext override def next(): ColumnarBatch = { - val cbSpillable = SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY) + val cbSpillable = SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_ON_DECK_PRIORITY) withRetryNoSplit(cbSpillable) { _ => withResource(cbSpillable.getColumnarBatch()) { cb => withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => @@ -1476,7 +1476,7 @@ class GpuRunningWindowIterator( def computeRunning(input: ColumnarBatch): ColumnarBatch = { val fixers = fixerIndexMap val numRows = input.numRows() - val cbSpillable = SpillableColumnarBatch(input, SpillPriorities.ACTIVE_BATCHING_PRIORITY) + val cbSpillable = SpillableColumnarBatch(input, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) withRetryNoSplit(cbSpillable) { _ => withResource(cbSpillable.getColumnarBatch()) { cb => withResource(computeBasicWindow(cb)) { basic => @@ -1522,12 +1522,10 @@ class GpuRunningWindowIterator( } // this section is outside of the retry logic because the calls to saveLastParts // and saveLastOrders can potentially close GPU resources - withResource(fixedUp) { fixed => - closeOnExcept(fixedUp) { _ => - newOrder.foreach(saveLastOrder) - newParts.foreach(saveLastParts) - } - convertToBatch(outputTypes, fixed) + withResource(fixedUp) { _ => + newOrder.foreach(saveLastOrder) + newParts.foreach(saveLastParts) + convertToBatch(outputTypes, fixedUp) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index ab47287f239..56615a74c48 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -977,22 +977,22 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) private var previousResult: Option[Scalar] = None // checkpoint - private var checkpointResult: Option[Scalar] = None + private var checkpointPreviousResult: Option[Scalar] = None override def checkpoint(): Unit = { - checkpointResult = previousResult + checkpointPreviousResult = previousResult } override def restore(): Unit = { - if (checkpointResult.isDefined) { + if (checkpointPreviousResult.isDefined) { // close previous result previousResult match { - case Some(r) if r != checkpointResult.get => + case Some(r) if r != checkpointPreviousResult.get => r.close() case _ => } - previousResult = checkpointResult - checkpointResult = None + previousResult = checkpointPreviousResult + checkpointPreviousResult = None } } From 9abd3606f70b914d8ef58158bd3859a5615b695f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 May 2023 12:18:50 -0600 Subject: [PATCH 24/30] re-implement first unit test to use iterator --- .../nvidia/spark/rapids/GpuWindowExec.scala | 17 +++++- .../spark/rapids/WindowRetrySuite.scala | 59 +++++++++---------- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index ca835458c90..949b0b7e3e3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1275,10 +1275,18 @@ class GpuWindowIterator( override def isRunningBatched: Boolean = false - override def hasNext: Boolean = input.hasNext + override def hasNext: Boolean = onDeck.isDefined || input.hasNext + + var onDeck: Option[SpillableColumnarBatch] = None override def next(): ColumnarBatch = { - val cbSpillable = SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + val cbSpillable = onDeck match { + case Some(x) => + onDeck = None + x + case _ => + getNext() + } withRetryNoSplit(cbSpillable) { _ => withResource(cbSpillable.getColumnarBatch()) { cb => withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => @@ -1292,6 +1300,11 @@ class GpuWindowIterator( } } } + + def getNext(): SpillableColumnarBatch = { + SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + } object GpuBatchedWindowIterator { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index 75d0fb5c2ce..44ccbc87a5e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -18,12 +18,13 @@ package com.nvidia.spark.rapids import ai.rapids.cudf._ import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.jni.RmmSpark import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar -import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.expressions.{RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} import org.apache.spark.sql.rapids.GpuCount -import org.apache.spark.sql.types.{DataType, IntegerType, LongType} +import org.apache.spark.sql.types.{DataType, DataTypes, IntegerType, LongType} class WindowRetrySuite extends RmmSparkRetrySuiteBase @@ -34,53 +35,49 @@ class WindowRetrySuite .column(5L, null.asInstanceOf[java.lang.Long], 3L, 3L) .build() withResource(windowTable) { tbl => - val cb = GpuColumnVector.from(tbl, Seq(IntegerType, LongType).toArray[DataType]) - spy(SpillableColumnarBatch(cb, -1)) + GpuColumnVector.from(tbl, Seq(IntegerType, LongType).toArray[DataType]) } } - def setupCountAgg( + def setupWindowIterator( frame: GpuSpecifiedWindowFrame, - orderSpec: Seq[SortOrder] = Seq.empty): - (GroupedAggregations, Array[ai.rapids.cudf.ColumnVector]) = { - val groupAggs = new GroupedAggregations() + orderSpec: Seq[SortOrder] = Seq.empty): GpuWindowIterator = { val spec = GpuWindowSpecDefinition(Seq.empty, orderSpec, frame) val count = GpuWindowExpression(GpuCount(Seq(GpuLiteral.create(1, IntegerType))), spec) - groupAggs.addAggregation(count, Array(0), 2) - val windowOptsLength = 3 - (groupAggs, new Array[ai.rapids.cudf.ColumnVector](windowOptsLength)) + new GpuWindowIterator( + input = Seq(buildInputBatch()).iterator, + boundWindowOps = Seq(GpuAlias(count, "count")()), + boundPartitionSpec = Seq.empty, + boundOrderSpec = orderSpec, + outputTypes = Array(DataTypes.LongType), + numOutputBatches = NoopMetric, + numOutputRows = NoopMetric, + opTime = NoopMetric + ) } - /* test("row based window handles RetryOOM") { - val inputBatch = buildInputBatch() val frame = GpuSpecifiedWindowFrame( RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(UnboundedFollowing)) - val (groupAggs, outputColumns) = setupCountAgg(frame) + val it = setupWindowIterator(frame) + // pre-load a spillable batch before injecting the OOM + val spillableBatch = spy(it.getNext()) + it.onDeck = Some(spillableBatch) RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - groupAggs.doAggsAndClose( - false, - Seq.empty[SortOrder], - Array.empty, - Array.empty, - inputBatch.getColumnarBatch(), - outputColumns) - withResource(outputColumns) { _ => - var rowsLeftToCheck = 4 - withResource(outputColumns(2).copyToHost()) { hostCol => - (0 until hostCol.getRowCount.toInt).foreach { row => - assertResult(4)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } + val batch = it.next() + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { col => + (0 until batch.numRows().toInt).foreach { row => + assertResult(4)(col.getLong(row)) } - assertResult(0)(rowsLeftToCheck) } - verify(inputBatch, times(2)).getColumnarBatch() - verify(inputBatch, times(1)).close() + verify(spillableBatch, times(2)).getColumnarBatch() + verify(spillableBatch, times(1)).close() } + /* test("optimized-row based window handles RetryOOM") { val inputBatch = buildInputBatch() val frame = GpuSpecifiedWindowFrame( From 2f0de6e53796f1481704bed78ad80b19f9629328 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 May 2023 12:54:04 -0600 Subject: [PATCH 25/30] re-implement unit tests to call GpuWindowIterator --- .../spark/rapids/WindowRetrySuite.scala | 132 ++++++------------ 1 file changed, 41 insertions(+), 91 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index 44ccbc87a5e..2e990f02262 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -18,11 +18,11 @@ package com.nvidia.spark.rapids import ai.rapids.cudf._ import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.jni.RmmSpark +import com.nvidia.spark.rapids.jni.{RmmSpark, SplitAndRetryOOM} import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar -import org.apache.spark.sql.catalyst.expressions.{RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.{Ascending, CurrentRow, ExprId, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} import org.apache.spark.sql.rapids.GpuCount import org.apache.spark.sql.types.{DataType, DataTypes, IntegerType, LongType} @@ -44,7 +44,7 @@ class WindowRetrySuite orderSpec: Seq[SortOrder] = Seq.empty): GpuWindowIterator = { val spec = GpuWindowSpecDefinition(Seq.empty, orderSpec, frame) val count = GpuWindowExpression(GpuCount(Seq(GpuLiteral.create(1, IntegerType))), spec) - new GpuWindowIterator( + val it = new GpuWindowIterator( input = Seq(buildInputBatch()).iterator, boundWindowOps = Seq(GpuAlias(count, "count")()), boundPartitionSpec = Seq.empty, @@ -54,6 +54,11 @@ class WindowRetrySuite numOutputRows = NoopMetric, opTime = NoopMetric ) + // pre-load a spillable batch before injecting the OOM + // and wrap in mockito + val spillableBatch = spy(it.getNext()) + it.onDeck = Some(spillableBatch) + it } test("row based window handles RetryOOM") { @@ -62,149 +67,94 @@ class WindowRetrySuite GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(UnboundedFollowing)) val it = setupWindowIterator(frame) - // pre-load a spillable batch before injecting the OOM - val spillableBatch = spy(it.getNext()) - it.onDeck = Some(spillableBatch) + val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) val batch = it.next() assertResult(4)(batch.numRows()) - withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { col => - (0 until batch.numRows().toInt).foreach { row => - assertResult(4)(col.getLong(row)) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + (0 until hostCol.getRowCount.toInt).foreach { row => + assertResult(4)(hostCol.getLong(row)) } } - verify(spillableBatch, times(2)).getColumnarBatch() - verify(spillableBatch, times(1)).close() + verify(inputBatch, times(2)).getColumnarBatch() + verify(inputBatch, times(1)).close() } - /* test("optimized-row based window handles RetryOOM") { - val inputBatch = buildInputBatch() val frame = GpuSpecifiedWindowFrame( RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow)) - val (groupAggs, outputColumns) = setupCountAgg(frame) + val it = setupWindowIterator(frame) + val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - groupAggs.doAggsAndClose( - false, - Seq.empty[SortOrder], - Array.empty, - Array.empty, - inputBatch.getColumnarBatch(), - outputColumns) - withResource(outputColumns) { _ => - var rowsLeftToCheck = 4 - withResource(outputColumns(2).copyToHost()) { hostCol => - (0 until hostCol.getRowCount.toInt).foreach { row => - assertResult(row + 1)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } + val batch = it.next() + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + (0 until hostCol.getRowCount.toInt).foreach { row => + assertResult(row + 1)(hostCol.getLong(row)) } - assertResult(0)(rowsLeftToCheck) } verify(inputBatch, times(2)).getColumnarBatch() verify(inputBatch, times(1)).close() } test("ranged based window handles RetryOOM") { - val inputBatch = buildInputBatch() val frame = GpuSpecifiedWindowFrame( RangeFrame, GpuLiteral.create(-1, IntegerType), GpuSpecialFrameBoundary(CurrentRow)) val child = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "test") val orderSpec = SortOrder(child, Ascending) :: Nil - val (groupAggs, outputColumns) = setupCountAgg(frame, orderSpec = orderSpec) + val it = setupWindowIterator(frame, orderSpec = orderSpec) + val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - groupAggs.doAggsAndClose( - false, - orderSpec, - Array(0), - Array.empty, - inputBatch.getColumnarBatch(), - outputColumns) - withResource(outputColumns) { _ => - var rowsLeftToCheck = 4 - withResource(outputColumns(2).copyToHost()) { hostCol => - (0 until hostCol.getRowCount.toInt).foreach { row => - assertResult(4)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } + val batch = it.next() + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + (0 until hostCol.getRowCount.toInt).foreach { row => + assertResult(4)(hostCol.getLong(row)) } - assertResult(0)(rowsLeftToCheck) } verify(inputBatch, times(2)).getColumnarBatch() verify(inputBatch, times(1)).close() } test("SplitAndRetryOOM is not handled in doAggs") { - val inputBatch = buildInputBatch() - val frame = GpuSpecifiedWindowFrame( RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow)) - val (groupAggs, outputColumns) = setupCountAgg(frame) - // simulate a successful window operation - val theMock = mock[ColumnVector] - outputColumns(0) = theMock + val it = setupWindowIterator(frame) + val inputBatch = it.onDeck.get RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1) assertThrows[SplitAndRetryOOM] { - groupAggs.doAggsAndClose( - false, - Seq.empty[SortOrder], - Array.empty, - Array.empty, - inputBatch.getColumnarBatch(), - outputColumns) + it.next() } - // when we throw we must have closed any columns in `outputColumns` that are not null - // and we would have marked them null - assertResult(null)(outputColumns(0)) - verify(theMock, times(1)).close() verify(inputBatch, times(1)).getColumnarBatch() verify(inputBatch, times(1)).close() } test("row based group by window handles RetryOOM") { - val inputBatch = buildInputBatch() val frame = GpuSpecifiedWindowFrame( RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow)) - val (groupAggs, outputColumns) = setupCountAgg(frame) + val it = setupWindowIterator(frame) + val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - groupAggs.doAggsAndClose( - false, - Seq.empty[SortOrder], - Array.empty, - Array(1), - inputBatch.getColumnarBatch(), - outputColumns) - withResource(outputColumns) { _ => - var rowsLeftToCheck = 4 - withResource(outputColumns(2).copyToHost()) { hostCol => - (0 until hostCol.getRowCount.toInt).foreach { row => - if (row == 0) { // 5 - assertResult(1)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } else if (row == 1) { // null - assertResult(1)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } else if (row == 2) { // 3 - assertResult(1)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } else if (row == 3) { // 3 - assertResult(2)(hostCol.getLong(row)) - rowsLeftToCheck -= 1 - } + val batch = it.next() + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + (0 until hostCol.getRowCount.toInt).foreach { row => + val expected = row match { + case 3 => 3 + case _ => 1 } + assertResult(expected)(hostCol.getLong(row)) } - assertResult(0)(rowsLeftToCheck) } verify(inputBatch, times(2)).getColumnarBatch() verify(inputBatch, times(1)).close() } - */ } From c56fe60302a482339edfe87fd766875d42324e81 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 May 2023 13:58:30 -0600 Subject: [PATCH 26/30] fix error in test, add more assertions --- .../scala/com/nvidia/spark/rapids/WindowRetrySuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index 2e990f02262..38627d49bd6 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -72,6 +72,7 @@ class WindowRetrySuite val batch = it.next() assertResult(4)(batch.numRows()) withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) (0 until hostCol.getRowCount.toInt).foreach { row => assertResult(4)(hostCol.getLong(row)) } @@ -91,6 +92,7 @@ class WindowRetrySuite val batch = it.next() assertResult(4)(batch.numRows()) withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) (0 until hostCol.getRowCount.toInt).foreach { row => assertResult(row + 1)(hostCol.getLong(row)) } @@ -112,6 +114,7 @@ class WindowRetrySuite val batch = it.next() assertResult(4)(batch.numRows()) withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) (0 until hostCol.getRowCount.toInt).foreach { row => assertResult(4)(hostCol.getLong(row)) } @@ -146,9 +149,10 @@ class WindowRetrySuite val batch = it.next() assertResult(4)(batch.numRows()) withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) (0 until hostCol.getRowCount.toInt).foreach { row => val expected = row match { - case 3 => 3 + case 3 => 2 case _ => 1 } assertResult(expected)(hostCol.getLong(row)) From 189df7dca5a7f5ed4dc0d1d9dc52274345201d13 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 May 2023 18:02:05 -0600 Subject: [PATCH 27/30] fix segfault --- .../spark/rapids/WindowRetrySuite.scala | 81 ++++++++++--------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index 38627d49bd6..85eb632407b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -41,13 +41,14 @@ class WindowRetrySuite def setupWindowIterator( frame: GpuSpecifiedWindowFrame, - orderSpec: Seq[SortOrder] = Seq.empty): GpuWindowIterator = { + orderSpec: Seq[SortOrder] = Seq.empty, + boundPartitionSpec: Seq[GpuExpression] = Seq.empty): GpuWindowIterator = { val spec = GpuWindowSpecDefinition(Seq.empty, orderSpec, frame) val count = GpuWindowExpression(GpuCount(Seq(GpuLiteral.create(1, IntegerType))), spec) val it = new GpuWindowIterator( input = Seq(buildInputBatch()).iterator, boundWindowOps = Seq(GpuAlias(count, "count")()), - boundPartitionSpec = Seq.empty, + boundPartitionSpec, boundOrderSpec = orderSpec, outputTypes = Array(DataTypes.LongType), numOutputBatches = NoopMetric, @@ -69,16 +70,17 @@ class WindowRetrySuite val it = setupWindowIterator(frame) val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - val batch = it.next() - assertResult(4)(batch.numRows()) - withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => - assertResult(4)(hostCol.getRowCount) - (0 until hostCol.getRowCount.toInt).foreach { row => - assertResult(4)(hostCol.getLong(row)) + withResource(it.next()) { batch => + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) + (0 until hostCol.getRowCount.toInt).foreach { row => + assertResult(4)(hostCol.getLong(row)) + } } + verify(inputBatch, times(2)).getColumnarBatch() + verify(inputBatch, times(1)).close() } - verify(inputBatch, times(2)).getColumnarBatch() - verify(inputBatch, times(1)).close() } test("optimized-row based window handles RetryOOM") { @@ -89,16 +91,17 @@ class WindowRetrySuite val it = setupWindowIterator(frame) val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - val batch = it.next() - assertResult(4)(batch.numRows()) - withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => - assertResult(4)(hostCol.getRowCount) - (0 until hostCol.getRowCount.toInt).foreach { row => - assertResult(row + 1)(hostCol.getLong(row)) + withResource(it.next()) { batch => + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) + (0 until hostCol.getRowCount.toInt).foreach { row => + assertResult(row + 1)(hostCol.getLong(row)) + } } + verify(inputBatch, times(2)).getColumnarBatch() + verify(inputBatch, times(1)).close() } - verify(inputBatch, times(2)).getColumnarBatch() - verify(inputBatch, times(1)).close() } test("ranged based window handles RetryOOM") { @@ -111,16 +114,17 @@ class WindowRetrySuite val it = setupWindowIterator(frame, orderSpec = orderSpec) val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - val batch = it.next() - assertResult(4)(batch.numRows()) - withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => - assertResult(4)(hostCol.getRowCount) - (0 until hostCol.getRowCount.toInt).foreach { row => - assertResult(4)(hostCol.getLong(row)) + withResource(it.next()) { batch => + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) + (0 until hostCol.getRowCount.toInt).foreach { row => + assertResult(4)(hostCol.getLong(row)) + } } + verify(inputBatch, times(2)).getColumnarBatch() + verify(inputBatch, times(1)).close() } - verify(inputBatch, times(2)).getColumnarBatch() - verify(inputBatch, times(1)).close() } test("SplitAndRetryOOM is not handled in doAggs") { @@ -143,22 +147,23 @@ class WindowRetrySuite RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow)) - val it = setupWindowIterator(frame) + val it = setupWindowIterator(frame, boundPartitionSpec = Seq(GpuLiteral.create(1, IntegerType))) val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) - val batch = it.next() - assertResult(4)(batch.numRows()) - withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => - assertResult(4)(hostCol.getRowCount) - (0 until hostCol.getRowCount.toInt).foreach { row => - val expected = row match { - case 3 => 2 - case _ => 1 + withResource(it.next()) { batch => + assertResult(4)(batch.numRows()) + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => + assertResult(4)(hostCol.getRowCount) + (0 until hostCol.getRowCount.toInt).foreach { row => + val expected = row match { + case 3 => 2 + case _ => 1 + } + assertResult(expected)(hostCol.getLong(row)) } - assertResult(expected)(hostCol.getLong(row)) } + verify(inputBatch, times(2)).getColumnarBatch() + verify(inputBatch, times(1)).close() } - verify(inputBatch, times(2)).getColumnarBatch() - verify(inputBatch, times(1)).close() } } From 911e8573f680b79634552848bfc718dfb0374d9d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 May 2023 12:37:25 -0600 Subject: [PATCH 28/30] fix test --- .../spark/rapids/WindowRetrySuite.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index 85eb632407b..bfcf350e823 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -31,11 +31,11 @@ class WindowRetrySuite with MockitoSugar { private def buildInputBatch() = { val windowTable = new Table.TestBuilder() - .column(1.asInstanceOf[java.lang.Integer], 1, 1, 1) .column(5L, null.asInstanceOf[java.lang.Long], 3L, 3L) + .column(1.asInstanceOf[java.lang.Integer], 1, 1, 1) .build() withResource(windowTable) { tbl => - GpuColumnVector.from(tbl, Seq(IntegerType, LongType).toArray[DataType]) + GpuColumnVector.from(tbl, Seq(LongType, IntegerType).toArray[DataType]) } } @@ -109,7 +109,7 @@ class WindowRetrySuite RangeFrame, GpuLiteral.create(-1, IntegerType), GpuSpecialFrameBoundary(CurrentRow)) - val child = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "test") + val child = GpuBoundReference(1, IntegerType, nullable = false)(ExprId(0), "test") val orderSpec = SortOrder(child, Ascending) :: Nil val it = setupWindowIterator(frame, orderSpec = orderSpec) val inputBatch = it.onDeck.get @@ -147,7 +147,8 @@ class WindowRetrySuite RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow)) - val it = setupWindowIterator(frame, boundPartitionSpec = Seq(GpuLiteral.create(1, IntegerType))) + val it = setupWindowIterator(frame, boundPartitionSpec = + Seq(GpuBoundReference(0, DataTypes.LongType, false)(ExprId.apply(0), "tbd"))) val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) withResource(it.next()) { batch => @@ -155,11 +156,15 @@ class WindowRetrySuite withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hostCol => assertResult(4)(hostCol.getRowCount) (0 until hostCol.getRowCount.toInt).foreach { row => - val expected = row match { - case 3 => 2 - case _ => 1 + if (row == 0) { // 5 + assertResult(1)(hostCol.getLong(row)) + } else if (row == 1) { // null + assertResult(1)(hostCol.getLong(row)) + } else if (row == 2) { // 3 + assertResult(1)(hostCol.getLong(row)) + } else if (row == 3) { // 3 + assertResult(2)(hostCol.getLong(row)) } - assertResult(expected)(hostCol.getLong(row)) } } verify(inputBatch, times(2)).getColumnarBatch() From 0c066eca3bf77794adf0090dcf65edea8782fbd0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 May 2023 12:43:49 -0600 Subject: [PATCH 29/30] revert column order --- .../scala/com/nvidia/spark/rapids/WindowRetrySuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala index bfcf350e823..7bfef4e0f1e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowRetrySuite.scala @@ -31,11 +31,11 @@ class WindowRetrySuite with MockitoSugar { private def buildInputBatch() = { val windowTable = new Table.TestBuilder() - .column(5L, null.asInstanceOf[java.lang.Long], 3L, 3L) .column(1.asInstanceOf[java.lang.Integer], 1, 1, 1) + .column(5L, null.asInstanceOf[java.lang.Long], 3L, 3L) .build() withResource(windowTable) { tbl => - GpuColumnVector.from(tbl, Seq(LongType, IntegerType).toArray[DataType]) + GpuColumnVector.from(tbl, Seq(IntegerType, LongType).toArray[DataType]) } } @@ -109,7 +109,7 @@ class WindowRetrySuite RangeFrame, GpuLiteral.create(-1, IntegerType), GpuSpecialFrameBoundary(CurrentRow)) - val child = GpuBoundReference(1, IntegerType, nullable = false)(ExprId(0), "test") + val child = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "test") val orderSpec = SortOrder(child, Ascending) :: Nil val it = setupWindowIterator(frame, orderSpec = orderSpec) val inputBatch = it.onDeck.get @@ -148,7 +148,7 @@ class WindowRetrySuite GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow)) val it = setupWindowIterator(frame, boundPartitionSpec = - Seq(GpuBoundReference(0, DataTypes.LongType, false)(ExprId.apply(0), "tbd"))) + Seq(GpuBoundReference(1, DataTypes.LongType, false)(ExprId.apply(0), "tbd"))) val inputBatch = it.onDeck.get RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) withResource(it.next()) { batch => From 02e2c472e8e9c497b5fd1705f35ff2d5698aae1b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 May 2023 12:44:08 -0600 Subject: [PATCH 30/30] remove TODO comment --- .../src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 949b0b7e3e3..8d235073913 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1572,7 +1572,6 @@ class GpuRunningWindowIterator( } override def next(): ColumnarBatch = { - // TODO maybe should create spillable batch here before calling computeRunning val cb = readNextInputBatch() withResource(new NvtxWithMetrics("RunningWindow", NvtxColor.CYAN, opTime)) { _ => val ret = computeRunning(cb) // takes ownership of cb