Skip to content

Commit

Permalink
Retry with smaller split on CudfColumnSizeOverflowException
Browse files Browse the repository at this point in the history
Depends on rapidsai/cudf#13911.

When a CUDF operation causes a column's size to exceed the valid range
for CUDF columns (i.e. cudf::size_type), CUDF will throw an exception.

Prior to this commit, the `RmmRapidsRetryIterator` does not attempt retries
with smaller splits, in this case. Instead, the overflow is treated as
a generic exception.

This commit allows the RmmRapidsRetryIterator to recognize the exception
specific to the overflow case (i.e. `CudfColumnSizeOverflowException`),
and attempt a split-retry.

Note: This error condition is difficult to reproduce. The catch/retry is
a "best effort" attempt not to fail the entire task.

Signed-off-by: MithunR <[email protected]>
  • Loading branch information
mythrocks committed Aug 24, 2023
1 parent 6729888 commit 5298e92
Showing 1 changed file with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.nvidia.spark.rapids

import scala.annotation.tailrec
import scala.collection.mutable

import ai.rapids.cudf.CudfColumnSizeOverflowException
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
Expand Down Expand Up @@ -212,6 +214,14 @@ object RmmRapidsRetryIterator extends Logging {
(causedByRetry, causedBySplit)
}

private def isColumnSizeOverflow(ex: Throwable): Boolean =
ex.isInstanceOf[CudfColumnSizeOverflowException]

@tailrec
private def isOrCausedByColumnSizeOverflow(ex: Throwable): Boolean = {
ex != null && (isColumnSizeOverflow(ex) || isOrCausedByColumnSizeOverflow(ex.getCause))
}

/**
* withRestoreOnRetry for CheckpointRestore. This helper function calls `fn` with no input and
* returns the result. In the event of an OOM Retry exception, it calls the restore() method
Expand All @@ -232,7 +242,7 @@ object RmmRapidsRetryIterator extends Logging {
case ex: Throwable =>
// Only restore on retry exceptions
val (topLevelIsRetry, _) = isRetryOrSplitAndRetry(ex)
if (topLevelIsRetry || causedByRetryOrSplit(ex)._1) {
if (topLevelIsRetry || causedByRetryOrSplit(ex)._1 || isOrCausedByColumnSizeOverflow(ex)) {
r.restore()
}
throw ex
Expand All @@ -259,7 +269,7 @@ object RmmRapidsRetryIterator extends Logging {
case ex: Throwable =>
// Only restore on retry exceptions
val (topLevelIsRetry, _) = isRetryOrSplitAndRetry(ex)
if (topLevelIsRetry || causedByRetryOrSplit(ex)._1) {
if (topLevelIsRetry || causedByRetryOrSplit(ex)._1 || isOrCausedByColumnSizeOverflow(ex)) {
r.foreach(_.restore())
}
throw ex
Expand Down Expand Up @@ -580,9 +590,14 @@ object RmmRapidsRetryIterator extends Logging {
lastException = ex

if (!topLevelIsRetry && !causedByRetry) {
// we want to throw early here, since we got an exception
// we were not prepared to handle
throw lastException
if (isOrCausedByColumnSizeOverflow(ex)) {
// CUDF column size overflow? Attempt split-retry.
doSplit = true
} else {
// we want to throw early here, since we got an exception
// we were not prepared to handle
throw lastException
}
}
// else another exception wrapped a retry. So we are going to try again
}
Expand Down

0 comments on commit 5298e92

Please sign in to comment.