Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/branch-23.10' into column-overfl…
Browse files Browse the repository at this point in the history
…ow-retry
  • Loading branch information
mythrocks committed Aug 24, 2023
2 parents 5298e92 + daedfe5 commit ad48d46
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/additional-functionality/rapids-shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ in our plugin:
| 3.3.0 | com.nvidia.spark.rapids.spark330.RapidsShuffleManager |
| 3.3.1 | com.nvidia.spark.rapids.spark331.RapidsShuffleManager |
| 3.3.2 | com.nvidia.spark.rapids.spark332.RapidsShuffleManager |
| 3.3.3 | com.nvidia.spark.rapids.spark333.RapidsShuffleManager |
| 3.4.0 | com.nvidia.spark.rapids.spark340.RapidsShuffleManager |
| 3.4.1 | com.nvidia.spark.rapids.spark341.RapidsShuffleManager |
| Databricks 10.4 | com.nvidia.spark.rapids.spark321db.RapidsShuffleManager |
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@
<spark330.version>3.3.0</spark330.version>
<spark331.version>3.3.1</spark331.version>
<spark332.version>3.3.2</spark332.version>
<spark333.version>3.3.3-SNAPSHOT</spark333.version>
<spark333.version>3.3.3</spark333.version>
<spark340.version>3.4.0</spark340.version>
<spark341.version>3.4.1</spark341.version>
<spark330cdh.version>3.3.0.3.3.7180.0-274</spark330cdh.version>
Expand Down Expand Up @@ -651,12 +651,12 @@
330,
331,
332,
333,
330cdh,
340,
341
</noSnapshot.buildvers>
<snapshot.buildvers>
333
</snapshot.buildvers>
<databricks.buildvers>
321db,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import scala.annotation.tailrec
import scala.collection.mutable.Queue

import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

Expand Down Expand Up @@ -73,7 +74,12 @@ class AcceleratedColumnarToRowIterator(
private var at: Int = 0
private var total: Int = 0

onTaskCompletion(closeAllPendingBatches())
// Don't install the callback if in a unit test
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
closeAllPendingBatches()
}
}

private def setCurrentBatch(wip: HostColumnVector): Unit = {
currentCv = Some(wip)
Expand All @@ -100,34 +106,42 @@ class AcceleratedColumnarToRowIterator(
new Table(rearrangedColumns : _*)
}

private[this] def setupBatch(cb: ColumnarBatch): Boolean = {
private[this] def setupBatchAndClose(scb: SpillableColumnarBatch): Boolean = {
numInputBatches += 1
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
if (cb.numRows() > 0) {
numOutputRows += scb.numRows()
if (scb.numRows() > 0) {
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
withResource(rearrangeRows(cb)) { table =>
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at
// most 184 double/long values. Spark by default limits codegen to 100 fields
// "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that
// until we have tested it more. We branching over the size of the output to know which
// kernel to call. If schema.length < 100 we call the fixed-width optimized version,
// otherwise the generic one
withResource(if (schema.length < 100) {
table.convertToRowsFixedWidthOptimized()
} else {
table.convertToRows()
}) { rowsCvList =>
val it = RmmRapidsRetryIterator.withRetry(scb, splitSpillableInHalfByRows) { attempt =>
withResource(attempt.getColumnarBatch()) { attemptCb =>
withResource(rearrangeRows(attemptCb)) { table =>
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which
// means at most 184 double/long values. Spark by default limits codegen to 100
// fields "spark.sql.codegen.maxFields". So, we are going to be cautious and
// start with that until we have tested it more. We branching over the size of
// the output to know which kernel to call. If schema.length < 100 we call the
// fixed-width optimized version, otherwise the generic one
if (schema.length < 100) {
table.convertToRowsFixedWidthOptimized()
} else {
table.convertToRows()
}
}
}
}
assert(it.hasNext, "Got an unexpected empty iterator after setting up batch with retry")
it.foreach { rowsCvList =>
withResource(rowsCvList) { _ =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
setCurrentBatch(pendingCvs.dequeue())
return true
}
}
setCurrentBatch(pendingCvs.dequeue())
return true
}
}
false
Expand All @@ -148,16 +162,20 @@ class AcceleratedColumnarToRowIterator(
// keep fetching input batches until we have a non-empty batch ready
val nextBatch = fetchNextBatch()
if (nextBatch.isDefined) {
if (!withResource(nextBatch.get)(setupBatch)) {
if (!setupBatchAndClose(nextBatch.get)) {
populateBatch()
}
}
}

private def fetchNextBatch(): Option[ColumnarBatch] = {
private def fetchNextBatch(): Option[SpillableColumnarBatch] = {
withResource(new NvtxWithMetrics("ColumnarToRow: fetch", NvtxColor.BLUE, streamTime)) { _ =>
if (batches.hasNext) {
Some(batches.next())
// Make it spillable once getting a columnar batch.
val spillBatch = closeOnExcept(batches.next()) { cb =>
SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
}
Some(spillBatch)
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.SparkShimVersion

object SparkShimServiceProvider {
val VERSION = SparkShimVersion(3, 3, 3)
val VERSIONNAMES = Seq(s"$VERSION", s"$VERSION-SNAPSHOT")
val VERSIONNAMES = Seq(s"$VERSION")
}

class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import ai.rapids.cudf.ColumnVector
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.vectorized.ColumnarBatch

class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {

private val ref = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "a")
private val attrs = Seq(AttributeReference(ref.name, ref.dataType, ref.nullable)())
private val NUM_ROWS = 50

private def buildBatch: ColumnarBatch = {
val ints = 0 until NUM_ROWS
new ColumnarBatch(
Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), NUM_ROWS)
}

test("Accelerated columnar to row with retry OOM") {
val aCol2RowIter = new AcceleratedColumnarToRowIterator(
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
}
assertResult(NUM_ROWS)(numRows)
}

test("Accelerated columnar_to_row with split and retry OOM") {
val aCol2RowIter = new AcceleratedColumnarToRowIterator(
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
}
assertResult(NUM_ROWS)(numRows)
}

}

0 comments on commit ad48d46

Please sign in to comment.