From 0e04172c93b8d85cdae0449c5d22d4e0f148aeec Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 22 Feb 2023 08:34:51 -0600 Subject: [PATCH] Use withRetry in GpuCoalesceBatches Signed-off-by: Alessandro Bellina --- .../spark/rapids/GpuCoalesceBatches.scala | 387 ++++++++++++------ .../spark/rapids/HostColumnarToGpu.scala | 10 +- .../spark/rapids/RmmRapidsRetryIterator.scala | 238 +++++++---- .../rapids/GpuCoalesceBatchesRetrySuite.scala | 375 +++++++++++++++++ 4 files changed, 798 insertions(+), 212 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 311d160f06a6..52365fcee680 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -21,8 +21,9 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{Cuda, NvtxColor, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode} - +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRetry, withRetryNoSplit} import org.apache.spark.TaskContext + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -58,7 +59,7 @@ object ConcatAndConsumeAll { * @param dataTypes the output types. * @return a single batch with all of them concated together. */ - def buildNonEmptyBatchFromTypes(arrayOfBatches: Array[ColumnarBatch], + def buildNonEmptyBatchFromTypes(arrayOfBatches: Seq[ColumnarBatch], dataTypes: Array[DataType]): ColumnarBatch = { if (arrayOfBatches.length == 1) { arrayOfBatches(0) @@ -171,6 +172,13 @@ sealed abstract class CoalesceSizeGoal extends CoalesceGoal { */ trait RequireSingleBatchLike +/** + * Trait used for pattern matching for goals that could be split, as they + * only specify that batches won't be too much bigger than a maximum target + * size in bytes. + */ +trait SplittableGoal + /** * A single batch is required as the input to a node in the SparkPlan. This means * all of the data for a given task is in a single batch. This should be avoided @@ -209,7 +217,9 @@ case class RequireSingleBatchWithFilter(filterExpression: GpuExpression) * limitations in cudf for nested type columns. * @param targetSizeBytes the size of each batch in bytes. */ -case class TargetSize(override val targetSizeBytes: Long) extends CoalesceSizeGoal { +case class TargetSize(override val targetSizeBytes: Long) + extends CoalesceSizeGoal + with SplittableGoal { require(targetSizeBytes <= Integer.MAX_VALUE, "Target cannot exceed 2GB without checks for cudf row count limit") } @@ -235,7 +245,7 @@ case class BatchedByKey(gpuOrder: Seq[SortOrder])(val cpuOrder: Seq[SortOrder]) } abstract class AbstractGpuCoalesceIterator( - batches: Iterator[ColumnarBatch], + inputIter: Iterator[ColumnarBatch], goal: CoalesceSizeGoal, numInputRows: GpuMetric, numInputBatches: GpuMetric, @@ -246,10 +256,16 @@ abstract class AbstractGpuCoalesceIterator( opTime: GpuMetric, opName: String) extends Iterator[ColumnarBatch] with Arm with Logging { - private val iter = new CollectTimeIterator(s"$opName: collect", batches, streamTime) + private val iter = new CollectTimeIterator(s"$opName: collect", inputIter, streamTime) private var batchInitialized: Boolean = false + /** + * This iterator is redefined if this coalesce iterator is under a `SplittableGoal` + * and so might retry and split given OOMs + */ + private var coalesceBatchIterator: Iterator[ColumnarBatch] = Iterator.empty + /** * This is defined iff `goal` is `RequireSingleBatchWithFilter` and we have * reached the cuDF row-count limit. @@ -288,7 +304,7 @@ abstract class AbstractGpuCoalesceIterator( Option(TaskContext.get()) .foreach(_.addTaskCompletionListener[Unit](_ => clearOnDeck())) - override def hasNext: Boolean = { + private def getHasOnDeck: Boolean = { while (!hasOnDeck && iter.hasNext) { val cb = iter.next() withResource(new MetricRange(opTime)) { _ => @@ -305,6 +321,10 @@ abstract class AbstractGpuCoalesceIterator( hasOnDeck } + override def hasNext: Boolean = { + coalesceBatchIterator.hasNext || getHasOnDeck + } + /** * Called first to initialize any state needed for a new batch to be created. */ @@ -326,6 +346,23 @@ abstract class AbstractGpuCoalesceIterator( */ def concatAllAndPutOnGPU(): ColumnarBatch + /** + * True for coalesce iterators that support an iterator that can retry + * and produce smaller batches on OOMs. + */ + protected val supportsRetryIterator: Boolean = true + + /** + * Function that returns a retry iterator that returns coalesced batches, as much + * as possible. + * + * Note this throws if the subclass does not support splitting its input. + * (supportsRetryIterator = false) + * + * @return an iterator that should be used to obtain coalesced batches + */ + def getCoalesceRetryIterator: Iterator[ColumnarBatch] + /** * Called to cleanup any state when a batch is done (even if there was a failure) */ @@ -354,135 +391,193 @@ abstract class AbstractGpuCoalesceIterator( } /** - * Each call to next() will combine incoming batches up to the limit specified - * by [[RapidsConf.GPU_BATCH_SIZE_BYTES]]. However, if any incoming batch is greater - * than this size it will be passed through unmodified. + * Add input batches to the `batches` collection up to the limit specified + * by the goal. Note: for a size goal, if any incoming batch is greater than this size + * it will be passed through unmodified. * * If the coalesce goal is `RequireSingleBatch` then an exception will be thrown if there - * is remaining data after the first batch is produced. + * is remaining data after the first batch is added. * - * @return The coalesced batch + * @note protected for testing + * @return boolean that is true if this call reached the last input batch. */ - override def next(): ColumnarBatch = withResource(new MetricRange(opTime)) { _ => - // reset batch state - batchInitialized = false - batchRowLimit = 0 - - try { - var numRows: Long = 0 // to avoid overflows - var numBytes: Long = 0 - - // check if there is a batch "on deck" from a previous call to next() - if (hasOnDeck) { - val batch = popOnDeck() - numRows += batch.numRows() - numBytes += getBatchDataSize(batch) - addBatch(batch) - } + protected def populateCandidateBatches(): Boolean = { + var numRows: Long = 0 // to avoid overflows + var numBytes: Long = 0 + + // check if there is a batch "on deck" from a previous call to next() + if (hasOnDeck) { + val batch = popOnDeck() + numRows += batch.numRows() + numBytes += getBatchDataSize(batch) + addBatch(batch) + } - // there is a hard limit of 2^31 rows - while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) { - val cbFromIter = iter.next() + // there is a hard limit of 2^31 rows + while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) { + val cbFromIter = iter.next() - var cb = if (inputFilterExpression.isDefined) { - // If we have reached the cuDF limit once, proactively filter batches - // after that first limit is reached. - GpuFilter(cbFromIter, inputFilterExpression.get) - } else { - cbFromIter - } + var cb = if (inputFilterExpression.isDefined) { + // If we have reached the cuDF limit once, proactively filter batches + // after that first limit is reached. + GpuFilter(cbFromIter, inputFilterExpression.get) + } else { + cbFromIter + } - closeOnExcept(cb) { _ => - val nextRows = cb.numRows() - numInputBatches += 1 + closeOnExcept(cb) { _ => + val nextRows = cb.numRows() + numInputBatches += 1 - // filter out empty batches - if (nextRows > 0) { - numInputRows += nextRows - val nextBytes = getBatchDataSize(cb) + // filter out empty batches + if (nextRows > 0) { + numInputRows += nextRows + val nextBytes = getBatchDataSize(cb) - // calculate the new sizes based on this input batch being added to the current - // output batch - val wouldBeRows = numRows + nextRows - val wouldBeBytes = numBytes + nextBytes + // calculate the new sizes based on this input batch being added to the current + // output batch + val wouldBeRows = numRows + nextRows + val wouldBeBytes = numBytes + nextBytes - if (wouldBeRows > Int.MaxValue) { - goal match { - case RequireSingleBatch => - throw new IllegalStateException("A single batch is required for this operation," + + if (wouldBeRows > Int.MaxValue) { + goal match { + case RequireSingleBatch => + throw new IllegalStateException("A single batch is required for this operation," + s" but cuDF only supports ${Int.MaxValue} rows. At least $wouldBeRows" + s" are in this partition. Please try increasing your partition count.") - case RequireSingleBatchWithFilter(filterExpression) => - // filter what we had already stored - val filteredDown = GpuFilter(concatAllAndPutOnGPU(), filterExpression) - closeOnExcept(filteredDown) { _ => - // filter the incoming batch as well - closeOnExcept(GpuFilter(cb, filterExpression)) { filteredCb => - cb = null // null out `cb` to prevent multiple close calls - val filteredWouldBeRows = filteredDown.numRows() + filteredCb.numRows() - if (filteredWouldBeRows > Int.MaxValue) { - throw new IllegalStateException( - "A single batch is required for this operation, but cuDF only supports " + + case RequireSingleBatchWithFilter(filterExpression) => + // filter what we had already stored + val filteredDown = GpuFilter(concatAllAndPutOnGPU(), filterExpression) + closeOnExcept(filteredDown) { _ => + // filter the incoming batch as well + closeOnExcept(GpuFilter(cb, filterExpression)) { filteredCb => + cb = null // null out `cb` to prevent multiple close calls + val filteredWouldBeRows = filteredDown.numRows() + filteredCb.numRows() + if (filteredWouldBeRows > Int.MaxValue) { + throw new IllegalStateException( + "A single batch is required for this operation, but cuDF only supports " + s"${Int.MaxValue} rows. At least $filteredWouldBeRows are in this " + "partition, even after filtering nulls. " + "Please try increasing your partition count.") - } - if (inputFilterExpression.isEmpty) { - inputFilterExpression = Some(filterExpression) - logWarning("Switched to null-filtering mode. This coalesce iterator " + + } + if (inputFilterExpression.isEmpty) { + inputFilterExpression = Some(filterExpression) + logWarning("Switched to null-filtering mode. This coalesce iterator " + "succeeded to fit rows under the cuDF limit only after null filtering. " + "Please try increasing your partition count.") - } - numRows = filteredWouldBeRows - numBytes = getBatchDataSize(filteredDown) + getBatchDataSize(filteredCb) - addBatch(filteredDown) - addBatch(filteredCb) } + numRows = filteredWouldBeRows + numBytes = getBatchDataSize(filteredDown) + getBatchDataSize(filteredCb) + addBatch(filteredDown) + addBatch(filteredCb) } - case _ => saveOnDeck(cb) // not a single batch requirement - } - } else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) { - saveOnDeck(cb) - } else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) { - // There are no explicit checks for the concatenate result exceeding the cudf 2^31 - // row count limit for any column. We are relying on cudf's concatenate to throw - // an exception if this occurs and limiting performance-oriented goals to under - // 2GB data total to avoid hitting that error. - saveOnDeck(cb) - } else { - addBatch(cb) - numRows = wouldBeRows - numBytes = wouldBeBytes + } + case _ => saveOnDeck(cb) // not a single batch requirement } + } else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) { + saveOnDeck(cb) + } else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) { + // There are no explicit checks for the concatenate result exceeding the cudf 2^31 + // row count limit for any column. We are relying on cudf's concatenate to throw + // an exception if this occurs and limiting performance-oriented goals to under + // 2GB data total to avoid hitting that error. + saveOnDeck(cb) } else { - cleanupInputBatch(cb) + addBatch(cb) + numRows = wouldBeRows + numBytes = wouldBeBytes } + } else { + cleanupInputBatch(cb) } } + } - val isLastBatch = !(hasOnDeck || iter.hasNext) + val isLastBatch = !(hasOnDeck || iter.hasNext) - // enforce single batch limit when appropriate - if (!isLastBatch) { - goal match { - case _: RequireSingleBatchLike => - throw new IllegalStateException("A single batch is required for this operation," + - " Please try increasing your partition count.") - case _ => - } + // enforce single batch limit when appropriate + if (!isLastBatch) { + goal match { + case _: RequireSingleBatchLike => + throw new IllegalStateException("A single batch is required for this operation," + + " Please try increasing your partition count.") + case _ => } + } + isLastBatch + } - numOutputRows += numRows - numOutputBatches += 1 - withResource(new NvtxWithMetrics(s"$opName concat", NvtxColor.CYAN, concatTime)) { _ => - val batch = concatAllAndPutOnGPU() - if (isLastBatch) { + var wasLastBatch: Boolean = false + + /** + * Each call to next() will combine batches according to the goal specified. + * However, if any incoming batch is greater than this size it will be passed + * through unmodified. + * + * If the coalesce goal is `RequireSingleBatch` then an exception will be thrown if there + * is remaining data after the first batch is produced. + * + * If OOMs occur while coalescing (which may include decompression depending on the + * instance), this may be retried, and as a result `ColumnarBatch` may be smaller than + * desired, since we follow a "coalesce half of the batches" strategy, which should + * half the number of batches that are candidates for coalesce at each OOM, leaving the rest + * for a subsequent call to `next`. + * + * @return The coalesced batch + */ + override def next(): ColumnarBatch = withResource(new MetricRange(opTime)) { _ => + if (coalesceBatchIterator.hasNext) { + val batch = coalesceBatchIterator.next() + if (wasLastBatch) { + // if the coalesce iterator is empty, and nothing is left on deck + if (!hasNext) { GpuColumnVector.tagAsFinalBatch(batch) + } // else, we already marked `wasLastBatch`, will check it again + // next time. + } + numOutputRows += batch.numRows() + numOutputBatches += 1 + batch + } else { + // reset batch state + batchInitialized = false + batchRowLimit = 0 + + val isLastBatch = if (!coalesceBatchIterator.hasNext) { + populateCandidateBatches() + } else { + wasLastBatch + } + + try { + withResource(new NvtxWithMetrics(s"$opName concat", NvtxColor.CYAN, concatTime)) { _ => + goal match { + case _: SplittableGoal if supportsRetryIterator => + coalesceBatchIterator = getCoalesceRetryIterator + val batch = coalesceBatchIterator.next() + if (isLastBatch) { + if (!hasNext) { + GpuColumnVector.tagAsFinalBatch(batch) + } else { + wasLastBatch = true // but couldn't mark this one because there are leftovers + } + } + numOutputRows += batch.numRows() + numOutputBatches += 1 + batch + case _ => + val singleBatch = concatAllAndPutOnGPU() + if (isLastBatch) { + GpuColumnVector.tagAsFinalBatch(singleBatch) + } + numOutputRows += singleBatch.numRows() + numOutputBatches += 1 + singleBatch + } } - batch + } finally { + cleanupConcatIsDone() } - } finally { - cleanupConcatIsDone() } } @@ -493,6 +588,27 @@ abstract class AbstractGpuCoalesceIterator( } addBatchToConcat(batch) } + + protected def splitBatchesToCoalesceFn: BatchesToCoalesce => Seq[BatchesToCoalesce] = { + (batchesToCoalesce: BatchesToCoalesce) => { + closeOnExcept(batchesToCoalesce) { _ => + val it = batchesToCoalesce.batches + val numBatches = it.length + if (numBatches <= 1) { + throw new OutOfMemoryError(s"Cannot split a sequence of $numBatches batches") + } + val res = it.splitAt(numBatches / 2) + Seq(BatchesToCoalesce(res._1), BatchesToCoalesce(res._2)) + } + } + } +} + +case class BatchesToCoalesce(batches: Seq[SpillableColumnarBatch]) + extends AutoCloseable { + override def close(): Unit = { + batches.safeClose() + } } class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], @@ -520,7 +636,8 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], opName) with Arm { protected val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty - private var maxDeviceMemory: Long = 0 + + protected var maxDeviceMemory: Long = 0 override def initNewBatch(batch: ColumnarBatch): Unit = { batches.safeClose() @@ -531,21 +648,30 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY, spillCallback)) - protected def popAll(): Array[ColumnarBatch] = { - closeOnExcept(batches.toArray.safeMap(_.getColumnarBatch())) { wip => - batches.safeClose() - batches.clear() - wip - } - } - - override def concatAllAndPutOnGPU(): ColumnarBatch = { - val ret = ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(popAll(), sparkTypes) + private def concatBatches(batches: Seq[SpillableColumnarBatch]): ColumnarBatch = { + val wip = batches.safeMap(_.getColumnarBatch()) + val ret = ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(wip, sparkTypes) // sum of current batches and concatenating batches. Approximately sizeof(ret * 2). maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2 ret } + override def concatAllAndPutOnGPU(): ColumnarBatch = { + val candidates = batches.clone() + batches.clear() + withRetryNoSplit(candidates) { attempt => + concatBatches(attempt) + } + } + + override def getCoalesceRetryIterator: Iterator[ColumnarBatch] = { + val candidates = BatchesToCoalesce(batches.clone()) + batches.clear() + withRetry(candidates, splitBatchesToCoalesceFn) { attempt: BatchesToCoalesce => + concatBatches(attempt.batches) + } + } + override def cleanupConcatIsDone(): Unit = { peakDevMemory.set(maxDeviceMemory) batches.clear() @@ -611,11 +737,8 @@ class GpuCompressionAwareCoalesceIterator( private[this] var codec: TableCompressionCodec = _ - override protected def popAll(): Array[ColumnarBatch] = { - closeOnExcept(batches.toArray.safeMap(_.getColumnarBatch())) { wip => - batches.safeClose() - batches.clear() - + private def concatBatches(batches: Array[SpillableColumnarBatch]): ColumnarBatch = { + closeOnExcept(batches.safeMap(_.getColumnarBatch())) { wip => val compressedBatchIndices = wip.zipWithIndex.filter { pair => GpuCompressedColumnVector.isBatchCompressed(pair._1) }.map(_._2) @@ -628,7 +751,7 @@ class GpuCompressionAwareCoalesceIterator( codec = TableCompressionCodec.getCodec(descr.codec, codecConfigs) } withResource(codec.createBatchDecompressor(maxDecompressBatchMemory, - Cuda.DEFAULT_STREAM)) { decompressor => + Cuda.DEFAULT_STREAM)) { decompressor => compressedVecs.foreach { cv => val buffer = cv.getTableBuffer val bufferMeta = cv.getTableMeta.bufferMeta @@ -642,13 +765,31 @@ class GpuCompressionAwareCoalesceIterator( val batchIndex = compressedBatchIndices(outputIndex) val compressedBatch = wip(batchIndex) wip(batchIndex) = - MetaUtils.getBatchFromMeta(outputBuffer, cv.getTableMeta, sparkTypes) + MetaUtils.getBatchFromMeta(outputBuffer, cv.getTableMeta, sparkTypes) compressedBatch.close() } } } } - wip + val onGPU = ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(wip, sparkTypes) + maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(onGPU) * 2 + onGPU + } + } + + override def concatAllAndPutOnGPU(): ColumnarBatch = { + val candidates = batches.clone() + batches.clear() + withRetryNoSplit(candidates) { attempt => + concatBatches(attempt.toArray) + } + } + + override def getCoalesceRetryIterator(): Iterator[ColumnarBatch] = { + val candidates = BatchesToCoalesce(batches.clone()) + batches.clear() + withRetry(candidates, splitBatchesToCoalesceFn) { attempt: BatchesToCoalesce => + concatBatches(attempt.batches.toArray) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 8cdbb119fba2..8006766c9e8f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -269,10 +269,16 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], // refine the estimate for number of rows based on this batch batchRowLimit = GpuBatchUtils.estimateRowCount(goal.targetSizeBytes, maxDeviceMemory, ret.numRows()) - ret } + override val supportsRetryIterator: Boolean = false + + override def getCoalesceRetryIterator: Iterator[ColumnarBatch] = { + throw new UnsupportedOperationException( + "HostColumnarToGpu iterator does not support retry iterators") + } + override def cleanupConcatIsDone(): Unit = { if (batchBuilder != null) { batchBuilder.close() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala index 5a82db68d7d6..599ebac0b16c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala @@ -54,7 +54,8 @@ object RmmRapidsRetryIterator extends Arm { input: Iterator[T], splitPolicy: T => Seq[T]) (fn: T => K): Iterator[K] = { - new RmmRapidsRetryAutoCloseableIterator(input, fn, splitPolicy) + val attemptIter = new AutoCloseableAttemptSpliterator(input, fn, splitPolicy) + new RmmRapidsRetryAutoCloseableIterator(attemptIter) } /** @@ -86,10 +87,9 @@ object RmmRapidsRetryIterator extends Arm { input: T, splitPolicy: T => Seq[T]) (fn: T => K): Iterator[K] = { - new RmmRapidsRetryAutoCloseableIterator( - SingleItemAutoCloseableIteratorInternal(input), - fn, - splitPolicy) + val attemptIter = new AutoCloseableAttemptSpliterator( + SingleItemAutoCloseableIteratorInternal(input), fn, splitPolicy) + new RmmRapidsRetryAutoCloseableIterator(attemptIter) } /** @@ -117,10 +117,10 @@ object RmmRapidsRetryIterator extends Arm { def withRetryNoSplit[T <: AutoCloseable, K]( input: T) (fn: T => K): K = { + val attemptIter = new AutoCloseableAttemptSpliterator( + SingleItemAutoCloseableIteratorInternal(input), fn) drainSingleWithVerification( - new RmmRapidsRetryAutoCloseableIterator( - SingleItemAutoCloseableIteratorInternal(input), - fn)) + new RmmRapidsRetryAutoCloseableIterator(attemptIter)) } /** @@ -149,10 +149,31 @@ object RmmRapidsRetryIterator extends Arm { input: Seq[T]) (fn: Seq[T] => K): K = { val wrapped = AutoCloseableSeqInternal(input) + val attemptIter = new AutoCloseableAttemptSpliterator( + SingleItemAutoCloseableIteratorInternal(wrapped), fn) drainSingleWithVerification( - new RmmRapidsRetryAutoCloseableIterator( - SingleItemAutoCloseableIteratorInternal(wrapped), - fn)) + new RmmRapidsRetryAutoCloseableIterator(attemptIter)) + } + + /** + * no-input withRetryNoSplit. This helper calls a function `fn` retrying the call if needed. + * The result is a single item of type K. + * + * The expectation when code enters `withRetryNoSplit` is that all of the caller's data is + * spillable already, allowing the thread to be blocked, and its data eventually spilled + * because of other higher priority work. + * + * `fn` must be idempotent: this is a requirement because we may call `fn` multiple times + * while handling retries. + * + * @param fn the work to perform. It is a function that takes nothing and produces K + * @tparam K `fn` result type + * @return a single item of type K + */ + def withRetryNoSplit[K](fn: => K): K = { + val attemptIter = new NoInputSpliterator(fn) + drainSingleWithVerification( + new RmmRapidsRetryAutoCloseableIterator(attemptIter)) } /** @@ -195,28 +216,71 @@ object RmmRapidsRetryIterator extends Arm { private case class SingleItemAutoCloseableIteratorInternal[T <: AutoCloseable](ts: T) extends Iterator[T] with AutoCloseable { - private var wasCalled = false - override def hasNext: Boolean = !wasCalled + private var wasCalledSuccessfully = false + override def hasNext: Boolean = !wasCalledSuccessfully override def next(): T = { - wasCalled = true + wasCalledSuccessfully = true ts } override def close(): Unit = { - if (!wasCalled) { + if (!wasCalledSuccessfully) { ts.close() } } } /** - * RmmRapidsRetryAutoCloseableIterator exposes an iterator that can retry work, - * specified by `fn`, abstracting away the retry specifics. Elements passed to this iterator - * must be AutoCloseable. + * A trait that defines an iterator of type K that supports two extra things: + * the ability to split its input, and the ability to close itself. + * + * Note that the input's type is not defined and is not relevant to this trait. + * + * @tparam K the resulting type + */ + trait Spliterator[K] extends Iterator[K] with AutoCloseable { + override def hasNext: Boolean + + def split(): Unit + + override def next(): K + + override def close(): Unit + } + + /** + * A spliterator that doesn't take any inputs, hence it is "empty", and it doesn't know + * how to split. It allows the caller to call the function `fn` once on `next`. + * @param fn the work to perform. It is a function that takes nothing and produces K + * @tparam K the resulting type + */ + class NoInputSpliterator[K](fn: => K) extends Spliterator[K] { + private var wasCalledSuccessfully: Boolean = false + + override def hasNext: Boolean = !wasCalledSuccessfully + + override def split(): Unit = { + throw new OutOfMemoryError( + "Attempted to handle a split, but was not initialized with a splitPolicy.") + } + + override def next(): K = { + val res = fn + wasCalledSuccessfully = true + res + } + + override def close(): Unit = {} + } + + /** + * A spliterator that takes an input iterator of auto closeable T, and a function `fn` + * that can map `T` to `K`, with an additional `splitPolicy` that can split `T` into a + * `Seq[T]` * - * It assumes the type K is AutoCloseable, and that if a split policy is specified, that it - * is capable of handling splitting one K into a sequence of them. + * It assumes the type T is AutoCloseable, and that if a split policy is specified, that it + * is capable of handling splitting one T into a sequence of them. * - * When an attempt to invoke function `fn` is successful, the item K in `input` will be + * When an attempt to invoke function `fn` is successful, the item T in `input` will be * closed. In the case of a failure, all attempts will be closed. It is the responsibility * of the caller to close any remaining items in `input` that have not been attempted. * @@ -230,25 +294,67 @@ object RmmRapidsRetryIterator extends Arm { * @param splitPolicy a function that can split an item of type T into a Seq[T]. The split * function must close the item passed to it. */ - class RmmRapidsRetryAutoCloseableIterator[T <: AutoCloseable, K]( + class AutoCloseableAttemptSpliterator[T <: AutoCloseable, K]( input: Iterator[T], fn: T => K, splitPolicy: T => Seq[T]) - extends RmmRapidsRetryIterator(input, fn, splitPolicy) - with Arm { - - def this( - input: Iterator[T], - fn: T => K) = { + extends Iterator[K] + with Spliterator[K] { + def this(input: Iterator[T], fn: T => K) = this(input, fn, null) + + protected val attemptStack = new mutable.ArrayStack[T]() + + override def hasNext: Boolean = input.hasNext || attemptStack.nonEmpty + + override def split(): Unit = { + // If `split` OOMs, we are already the last thread standing + // there is likely not much we can do, and for now we don't handle + // this OOM + if (splitPolicy == null) { + throw new OutOfMemoryError( + "Attempted to handle a split, but was not initialized with a splitPolicy.") + } + // splitPolicy must take ownership of the argument + val splitted = splitPolicy(attemptStack.pop()) + // the splitted sequence needs to be inserted in reverse order + // so we try the first item first. + splitted.reverse.foreach(attemptStack.push) } - override def invokeFn(k: T): K = { - val res = super.invokeFn(k) - k.close() // close `k` only if we didn't throw from `invokeFn` + override def next(): K = { + if (attemptStack.isEmpty && input.hasNext) { + attemptStack.push(input.next()) + } + val popped = attemptStack.head + val res = fn(popped) + attemptStack.pop().close() res } + override def close(): Unit = { + attemptStack.safeClose() + attemptStack.clear() + } + } + + /** + * RmmRapidsRetryAutoCloseableIterator exposes an iterator that can retry work, + * specified by `fn`, abstracting away the retry specifics. Elements passed to this iterator + * must be AutoCloseable. + * + * It assumes the type T is AutoCloseable, and that if a split policy is specified, that it + * is capable of handling splitting one T into a sequence of them. + * + * @tparam T element type that must be AutoCloseable + * @tparam K result type + * @param attemptIter an iterator of T + */ + class RmmRapidsRetryAutoCloseableIterator[T <: AutoCloseable, K]( + attemptIter: Spliterator[K]) + extends RmmRapidsRetryIterator[T, K](attemptIter) + with Arm { + override def hasNext: Boolean = super.hasNext override def next(): K = { @@ -261,7 +367,7 @@ object RmmRapidsRetryIterator extends Arm { case t: Throwable => // exception occurred while trying to handle this retry // we close our attempts (which includes the item we last attempted) - attemptStack.safeClose(t) + attemptIter.close() throw t } } @@ -273,25 +379,13 @@ object RmmRapidsRetryIterator extends Arm { * * @tparam T element type * @tparam K `fn` result type - * @param input an iterator of T - * @param fn a function that takes T and produces K - * @param splitPolicy an optional function that can split T into a Seq[T], if provided - * `splitPolicy` must take ownership of items passed to it. + * @param attemptIter an iterator of T */ - class RmmRapidsRetryIterator[T, K]( - input: Iterator[T], - fn: T => K, - splitPolicy: T => Seq[T]) extends Iterator[K] with Arm { - def this(input: Iterator[T], fn: T => K) = - this(input, fn, null) + class RmmRapidsRetryIterator[T, K](attemptIter: Spliterator[K]) + extends Iterator[K] + with Arm { - protected val attemptStack = new mutable.ArrayStack[T]() - - override def hasNext: Boolean = input.hasNext || attemptStack.nonEmpty - - protected def invokeFn(k: T): K = { - fn(k) - } + override def hasNext: Boolean = attemptIter.hasNext override def next(): K = { // this is set on the first exception, and we add suppressed if there are others @@ -300,75 +394,45 @@ object RmmRapidsRetryIterator extends Arm { var firstAttempt: Boolean = true var result: Option[K] = None var doSplit = false - while (result.isEmpty && (attemptStack.nonEmpty || input.hasNext)) { - if (attemptStack.isEmpty && input.hasNext) { - attemptStack.push(input.next()) - } + while (result.isEmpty && attemptIter.hasNext) { if (!firstAttempt) { // call thread block API RmmSpark.blockThreadUntilReady() } firstAttempt = false - val popped = attemptStack.pop() - val attempt = - if (doSplit) { - // If `split` OOMs, we are already the last thread standing - // there is likely not much we can do, and for now we don't handle - // this OOM - val splitted = splitAndClose(popped) - // the splitted sequence needs to be inserted in reverse order - // so we try the first item first. - splitted.reverse.foreach(attemptStack.push) - attemptStack.pop() - } else { - popped - } + if (doSplit) { + attemptIter.split() + } doSplit = false try { // call the user's function - result = Some(invokeFn(attempt)) + result = Some(attemptIter.next()) } catch { case retryOOM: RetryOOM => if (lastException != null) { retryOOM.addSuppressed(lastException) } lastException = retryOOM - // put it back - attemptStack.push(attempt) case splitAndRetryOOM: SplitAndRetryOOM => // we are the only thread if (lastException != null) { splitAndRetryOOM.addSuppressed(lastException) } lastException = splitAndRetryOOM - // put it back - attemptStack.push(attempt) doSplit = true case other: Throwable => if (lastException != null) { other.addSuppressed(lastException) } lastException = other - // put this attempt back on our stack, so that it will be closed - attemptStack.push(attempt) - // we want to throw early here, since we got an exception // we were not prepared to handle throw lastException } } - result.get - } - - // It is assumed that OOM in this function is not handled. - private def splitAndClose(item:T): Seq[T] = { - if (splitPolicy == null) { - // put item into the attempt stack, to be closed - attemptStack.push(item) - throw new OutOfMemoryError( - "Attempted to handle a split, but was not initialized with a splitPolicy.") + if (result.isEmpty) { + throw lastException } - // splitPolicy must close `item` - splitPolicy(item) + result.get } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala new file mode 100644 index 000000000000..103a59388d9b --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.{Rmm, RmmAllocationMode, RmmEventHandler, Table} +import com.nvidia.spark.rapids.jni.{RetryOOM, RmmSpark, SplitAndRetryOOM} +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterEach, FunSuite} +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch + +class GpuCoalesceBatchesRetrySuite + extends FunSuite + with BeforeAndAfterEach with MockitoSugar with Arm { + + private var rmmWasInitialized = false + override def beforeEach(): Unit = { + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + if (!Rmm.isInitialized) { + rmmWasInitialized = true + Rmm.initialize(RmmAllocationMode.CUDA_DEFAULT, null, 512 * 1024 * 1024) + } + val deviceStorage = new RapidsDeviceMemoryStore() + val catalog = new RapidsBufferCatalog(deviceStorage) + RapidsBufferCatalog.setCatalog(catalog) + val mockEventHandler = new BaseRmmEventHandler() + RmmSpark.setEventHandler(mockEventHandler) + RmmSpark.associateThreadWithTask(RmmSpark.getCurrentThreadId, 1) + } + + override def afterEach(): Unit = { + RmmSpark.removeThreadAssociation(RmmSpark.getCurrentThreadId) + RmmSpark.clearEventHandler() + RapidsBufferCatalog.close() + if (rmmWasInitialized) { + Rmm.shutdown() + } + } + + private def buildBatchesToCoalesce(): Seq[ColumnarBatch] = { + (0 until 10).map { _ => + withResource(new Table.TestBuilder() + .column(1L.asInstanceOf[java.lang.Long]) + .build()) { tbl => + spy(GpuColumnVector.from(tbl, Seq(LongType).toArray[DataType])) + } + } + } + + private def buildHostBatchesToCoalesce(): Seq[ColumnarBatch] = { + buildBatchesToCoalesce().map { dcb => + withResource(dcb) { _ => + val hostColumns = (0 until dcb.numCols()).map( + i => dcb.column(i).asInstanceOf[GpuColumnVector].copyToHost()) + new ColumnarBatch(hostColumns.toArray, dcb.numRows()) + } + } + } + + /** + * Get the GpuCoalesceIterator, GpuCompressionAwareCoalesceIterator and + * optionally the HostToGpuCoalesceIterator. + * + * The last iterator is optional because it doesn't support SplitAndRetryOOM + * @param injectRetry number of retries to inject + * @param injectSplitAndRetry number of split and retries to inject + * @param mockInjectSplitAndRetry mock a SplitAndRetryOOM (on + * SpillableColumnarBatch.getColumnarBatch) + * @param includeHost whether to also include HostToGpuCoalesceIterator + * @return a sequence of Iterator[ColumnarBatch] + */ + def getIters( + goal: CoalesceSizeGoal = TargetSize(1024), + injectRetry: Int = 0, + injectSplitAndRetry: Int = 0, + mockInjectSplitAndRetry: Boolean = false): Seq[Iterator[ColumnarBatch]] = { + val ab = new ArrayBuffer[Iterator[ColumnarBatch]]() + ab.append(new InjectableCoalesceIterator( + buildBatchesToCoalesce(), + goal, + injectRetry, + injectSplitAndRetry, + mockInjectSplitAndRetry)) + ab.append(new InjectableCompressionAwareCoalesceIterator( + buildBatchesToCoalesce(), + goal, + injectRetry, + injectSplitAndRetry, + mockInjectSplitAndRetry)) + ab + } + + def getHostIter( + injectRetry: Int = 0, + goal: CoalesceSizeGoal = TargetSize(1024)): Iterator[ColumnarBatch] = { + new InjectableHostToGpuCoalesceIterator( + buildHostBatchesToCoalesce(), + goal = goal, + injectRetry) + } + + test("coalesce gpu batches without failures") { + val iters = getIters() ++ Seq(getHostIter()) + iters.foreach { iter => + withResource(iter.next()) { coalesced => + assertResult(10)(coalesced.numRows()) + assertResult(true)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + } + } + + test("coalesce gpu batches with retry") { + val iters = getIters(injectRetry = 1) + iters.foreach { iter => + withResource(iter.next()) { coalesced => + assertResult(10)(coalesced.numRows()) + assertResult(true)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + } + } + + // this is a placeholder test. The HostToGpuCoalesceIterator is going to + // need a change in cuDF to make it retriable, so we are asserting here + // that the exception we could handle `RetryOOM` is being thrown. + test("coalesce gpu batches with retry host iter") { + val iter = getHostIter(injectRetry = 1) + assertThrows[RetryOOM] { + withResource(iter.next()) { coalesced => + assertResult(10)(coalesced.numRows()) + } + } + } + + test("coalesce gpu batches splits in half with SplitAndRetryOOM") { + val iters = getIters(injectSplitAndRetry = 1) + iters.foreach { iter => + withResource(iter.next()) { coalesced => + assertResult(5)(coalesced.numRows()) + assertResult(false)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + withResource(iter.next()) { coalesced => + assertResult(5)(coalesced.numRows()) + assertResult(true)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + assertResult(false)(iter.hasNext) + } + } + + test("coalesce gpu batches splits in quarters with SplitAndRetryOOM") { + val iters = getIters(injectSplitAndRetry = 2) + iters.foreach { iter => + withResource(iter.next()) { coalesced => + assertResult(2)(coalesced.numRows()) + assertResult(false)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + withResource(iter.next()) { coalesced => + assertResult(3)(coalesced.numRows()) + assertResult(false)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + withResource(iter.next()) { coalesced => + assertResult(5)(coalesced.numRows()) + assertResult(true)(GpuColumnVector.isTaggedAsFinalBatch(coalesced)) + } + assertResult(false)(iter.hasNext) + } + } + + test("coalesce gpu batches fails with OOM if it cannot split enough") { + val iters = getIters(mockInjectSplitAndRetry = true) + iters.foreach { iter => + assertThrows[OutOfMemoryError] { + iter.next() // throws + } + val batches = iter.asInstanceOf[GpuCoalesceIteratorMocks].getBatches() + assertResult(10)(batches.length) + batches.foreach(b => + verify(b, times(1)).close() + ) + } + } + + test("coalesce gpu batches with retry with non-splittable goal") { + val iters = getIters(injectRetry = 1, goal = RequireSingleBatch) + iters.foreach { iter => + withResource(iter.next()) { coalesced => + assertResult(10)(coalesced.numRows()) + } + } + } + + test("coalesce gpu batches throws if SplitAndRetryOOM with non-splittable goal") { + val iters = getIters(injectSplitAndRetry = 1, goal = RequireSingleBatch) + iters.foreach { iter => + assertThrows[OutOfMemoryError] { + iter.next() + } + val batches = iter.asInstanceOf[GpuCoalesceIteratorMocks].getBatches() + assertResult(10)(batches.length) + batches.foreach(b => + verify(b, times(1)).close() + ) + } + } + + class SpillableColumnarBatchThatThrows(batch: ColumnarBatch) + extends SpillableColumnarBatch { + override def getSpillCallback: SpillCallback = + RapidsBuffer.defaultSpillCallback + override def numRows(): Int = 0 + override def setSpillPriority(priority: Long): Unit = {} + override def getColumnarBatch(): ColumnarBatch = { + throw new SplitAndRetryOOM() + } + override def sizeInBytes: Long = 0 + override def dataTypes: Array[DataType] = Array.empty + override def close(): Unit = batch.close() + } + + trait GpuCoalesceIteratorMocks { + def getBatches(): Seq[ColumnarBatch] + + def injectError(injectRetry: Int, injectSplitAndRetry: Int): Unit = { + if (injectRetry > 0) { + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, injectRetry) + } + if (injectSplitAndRetry > 0) { + RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, injectSplitAndRetry) + } + } + + def getBatchToConcat( + mockInjectSplitAndRetry: Boolean, + batch: ColumnarBatch): SpillableColumnarBatch = { + val spillableSpy = if (mockInjectSplitAndRetry) { + spy( + new SpillableColumnarBatchThatThrows(batch)) + } else { + spy(SpillableColumnarBatch( + batch, + SpillPriorities.ACTIVE_BATCHING_PRIORITY, + RapidsBuffer.defaultSpillCallback)) + } + spillableSpy + } + } + + class InjectableHostToGpuCoalesceIterator( + batchesToConcat: Seq[ColumnarBatch], + goal: CoalesceSizeGoal, + injectRetry: Int = 0) + extends HostToGpuCoalesceIterator( + batchesToConcat.iterator, + goal, + StructType(Seq(StructField("col0", LongType, nullable = true))), + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + "test", + false) + with GpuCoalesceIteratorMocks { + + override def populateCandidateBatches(): Boolean = { + val lastBatchTag = super.populateCandidateBatches() + injectError(injectRetry, injectSplitAndRetry = 0) + lastBatchTag + } + + override def getBatches(): Seq[ColumnarBatch] = batchesToConcat + } + + class InjectableCompressionAwareCoalesceIterator( + batchesToConcat: Seq[ColumnarBatch], + goal: CoalesceSizeGoal, + injectRetry: Int = 0, + injectSplitAndRetry: Int = 0, + mockInjectSplitAndRetry: Boolean = false) + extends GpuCompressionAwareCoalesceIterator( + batchesToConcat.iterator, + Seq(LongType).toArray, + goal, + maxDecompressBatchMemory=10240, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + RapidsBuffer.defaultSpillCallback, + "test", + TableCompressionCodecConfig(1024)) with GpuCoalesceIteratorMocks { + override def populateCandidateBatches(): Boolean = { + val lastBatchTag = super.populateCandidateBatches() + injectError(injectRetry, injectSplitAndRetry) + lastBatchTag + } + + override def addBatchToConcat(batch: ColumnarBatch): Unit = { + batches.append(getBatchToConcat(mockInjectSplitAndRetry, batch)) + } + + override def getBatches(): Seq[ColumnarBatch] = batchesToConcat + } + + class InjectableCoalesceIterator( + batchesToConcat: Seq[ColumnarBatch], + goal: CoalesceSizeGoal, + injectRetry: Int = 0, + injectSplitAndRetry: Int = 0, + mockInjectSplitAndRetry: Boolean = false) + extends GpuCoalesceIterator ( + batchesToConcat.iterator, + Seq(LongType).toArray, + goal, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + NoopMetric, + RapidsBuffer.defaultSpillCallback, + "test") with GpuCoalesceIteratorMocks { + override def populateCandidateBatches(): Boolean = { + val lastBatchTag = super.populateCandidateBatches() + injectError(injectRetry, injectSplitAndRetry) + lastBatchTag + } + + override def addBatchToConcat(batch: ColumnarBatch): Unit = { + batches.append(getBatchToConcat(mockInjectSplitAndRetry, batch)) + } + + override def getBatches(): Seq[ColumnarBatch] = batchesToConcat + } + + private class BaseRmmEventHandler extends RmmEventHandler { + override def getAllocThresholds: Array[Long] = null + override def getDeallocThresholds: Array[Long] = null + override def onAllocThreshold(totalAllocSize: Long): Unit = {} + override def onDeallocThreshold(totalAllocSize: Long): Unit = {} + override def onAllocFailure(sizeRequested: Long, retryCount: Int): Boolean = { + false + } + } +}