diff --git a/docs/additional-functionality/rapids-shuffle.md b/docs/additional-functionality/rapids-shuffle.md
index 422e078aeed..51c775cb34c 100644
--- a/docs/additional-functionality/rapids-shuffle.md
+++ b/docs/additional-functionality/rapids-shuffle.md
@@ -28,6 +28,7 @@ in our plugin:
| 3.3.0 | com.nvidia.spark.rapids.spark330.RapidsShuffleManager |
| 3.3.1 | com.nvidia.spark.rapids.spark331.RapidsShuffleManager |
| 3.3.2 | com.nvidia.spark.rapids.spark332.RapidsShuffleManager |
+| 3.3.3 | com.nvidia.spark.rapids.spark333.RapidsShuffleManager |
| 3.4.0 | com.nvidia.spark.rapids.spark340.RapidsShuffleManager |
| 3.4.1 | com.nvidia.spark.rapids.spark341.RapidsShuffleManager |
| Databricks 10.4 | com.nvidia.spark.rapids.spark321db.RapidsShuffleManager |
diff --git a/pom.xml b/pom.xml
index 2352681c8ca..789e958b224 100644
--- a/pom.xml
+++ b/pom.xml
@@ -605,7 +605,7 @@
3.3.0
3.3.1
3.3.2
- 3.3.3-SNAPSHOT
+ 3.3.3
3.4.0
3.4.1
3.3.0.3.3.7180.0-274
@@ -651,12 +651,12 @@
330,
331,
332,
+ 333,
330cdh,
340,
341
- 333
321db,
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
index 80b324de0ae..72a2edaef31 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala
@@ -20,8 +20,9 @@ import scala.annotation.tailrec
import scala.collection.mutable.Queue
import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table}
-import com.nvidia.spark.rapids.Arm.withResource
+import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
+import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
@@ -73,7 +74,12 @@ class AcceleratedColumnarToRowIterator(
private var at: Int = 0
private var total: Int = 0
- onTaskCompletion(closeAllPendingBatches())
+ // Don't install the callback if in a unit test
+ Option(TaskContext.get()).foreach { tc =>
+ onTaskCompletion(tc) {
+ closeAllPendingBatches()
+ }
+ }
private def setCurrentBatch(wip: HostColumnVector): Unit = {
currentCv = Some(wip)
@@ -100,34 +106,42 @@ class AcceleratedColumnarToRowIterator(
new Table(rearrangedColumns : _*)
}
- private[this] def setupBatch(cb: ColumnarBatch): Boolean = {
+ private[this] def setupBatchAndClose(scb: SpillableColumnarBatch): Boolean = {
numInputBatches += 1
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
- numOutputRows += cb.numRows()
- if (cb.numRows() > 0) {
+ numOutputRows += scb.numRows()
+ if (scb.numRows() > 0) {
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
- withResource(rearrangeRows(cb)) { table =>
- // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at
- // most 184 double/long values. Spark by default limits codegen to 100 fields
- // "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that
- // until we have tested it more. We branching over the size of the output to know which
- // kernel to call. If schema.length < 100 we call the fixed-width optimized version,
- // otherwise the generic one
- withResource(if (schema.length < 100) {
- table.convertToRowsFixedWidthOptimized()
- } else {
- table.convertToRows()
- }) { rowsCvList =>
+ val it = RmmRapidsRetryIterator.withRetry(scb, splitSpillableInHalfByRows) { attempt =>
+ withResource(attempt.getColumnarBatch()) { attemptCb =>
+ withResource(rearrangeRows(attemptCb)) { table =>
+ // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which
+ // means at most 184 double/long values. Spark by default limits codegen to 100
+ // fields "spark.sql.codegen.maxFields". So, we are going to be cautious and
+ // start with that until we have tested it more. We branching over the size of
+ // the output to know which kernel to call. If schema.length < 100 we call the
+ // fixed-width optimized version, otherwise the generic one
+ if (schema.length < 100) {
+ table.convertToRowsFixedWidthOptimized()
+ } else {
+ table.convertToRows()
+ }
+ }
+ }
+ }
+ assert(it.hasNext, "Got an unexpected empty iterator after setting up batch with retry")
+ it.foreach { rowsCvList =>
+ withResource(rowsCvList) { _ =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
- setCurrentBatch(pendingCvs.dequeue())
- return true
}
}
+ setCurrentBatch(pendingCvs.dequeue())
+ return true
}
}
false
@@ -148,16 +162,20 @@ class AcceleratedColumnarToRowIterator(
// keep fetching input batches until we have a non-empty batch ready
val nextBatch = fetchNextBatch()
if (nextBatch.isDefined) {
- if (!withResource(nextBatch.get)(setupBatch)) {
+ if (!setupBatchAndClose(nextBatch.get)) {
populateBatch()
}
}
}
- private def fetchNextBatch(): Option[ColumnarBatch] = {
+ private def fetchNextBatch(): Option[SpillableColumnarBatch] = {
withResource(new NvtxWithMetrics("ColumnarToRow: fetch", NvtxColor.BLUE, streamTime)) { _ =>
if (batches.hasNext) {
- Some(batches.next())
+ // Make it spillable once getting a columnar batch.
+ val spillBatch = closeOnExcept(batches.next()) { cb =>
+ SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
+ }
+ Some(spillBatch)
} else {
None
}
diff --git a/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala b/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala
index 00bb99df8fa..f329546de6a 100644
--- a/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala
+++ b/sql-plugin/src/main/spark333/scala/com/nvidia/spark/rapids/shims/spark333/SparkShimServiceProvider.scala
@@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.SparkShimVersion
object SparkShimServiceProvider {
val VERSION = SparkShimVersion(3, 3, 3)
- val VERSIONNAMES = Seq(s"$VERSION", s"$VERSION-SNAPSHOT")
+ val VERSIONNAMES = Seq(s"$VERSION")
}
class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider {
diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ColumnToRowIteratorRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ColumnToRowIteratorRetrySuite.scala
new file mode 100644
index 00000000000..6b66f4ecca7
--- /dev/null
+++ b/tests/src/test/scala/com/nvidia/spark/rapids/ColumnToRowIteratorRetrySuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2023, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids
+
+import ai.rapids.cudf.ColumnVector
+import com.nvidia.spark.rapids.jni.RmmSpark
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
+
+ private val ref = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "a")
+ private val attrs = Seq(AttributeReference(ref.name, ref.dataType, ref.nullable)())
+ private val NUM_ROWS = 50
+
+ private def buildBatch: ColumnarBatch = {
+ val ints = 0 until NUM_ROWS
+ new ColumnarBatch(
+ Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), NUM_ROWS)
+ }
+
+ test("Accelerated columnar to row with retry OOM") {
+ val aCol2RowIter = new AcceleratedColumnarToRowIterator(
+ attrs,
+ Iterator(buildBatch),
+ NoopMetric, NoopMetric, NoopMetric, NoopMetric)
+ RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
+ var numRows = 0
+ aCol2RowIter.foreach { _ =>
+ numRows += 1
+ }
+ assertResult(NUM_ROWS)(numRows)
+ }
+
+ test("Accelerated columnar_to_row with split and retry OOM") {
+ val aCol2RowIter = new AcceleratedColumnarToRowIterator(
+ attrs,
+ Iterator(buildBatch),
+ NoopMetric, NoopMetric, NoopMetric, NoopMetric)
+ RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
+ var numRows = 0
+ aCol2RowIter.foreach { _ =>
+ numRows += 1
+ }
+ assertResult(NUM_ROWS)(numRows)
+ }
+
+}