Skip to content

Commit

Permalink
Print out the current attempt object when OOM inside a retry block (#…
Browse files Browse the repository at this point in the history
…11733)

closes #11732

This PR adds the support to print out the current attempt object being processed
when OOM happens in the retry block.
This is designed for the better OOM issues triage.
---------

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Nov 25, 2024
1 parent daaaf24 commit 6cba00d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -690,6 +690,11 @@ case class BatchesToCoalesce(batches: Array[SpillableColumnarBatch])
override def close(): Unit = {
batches.safeClose()
}

override def toString: String = {
val totalSize = batches.map(_.sizeInBytes).sum
s"BatchesToCoalesce totalSize:$totalSize, batches:[${batches.mkString(";")}]"
}
}

class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,9 @@ class BatchToGenerate(val fixUpOffset: Long, val spillable: SpillableColumnarBat
override def close(): Unit = {
spillable.close()
}

override def toString: String =
s"BatchToGenerate fixUpOffset:$fixUpOffset, spillable:$spillable"
}

class GpuGenerateIterator(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -311,6 +311,14 @@ object RmmRapidsRetryIterator extends Logging {
override def iterator: Iterator[T] = ts.iterator

override def apply(idx: Int): T = ts.apply(idx)

override def toString(): String = {
val totalSize = ts.map {
case scb: SpillableColumnarBatch => scb.sizeInBytes
case _ => 0L
}.sum
s"AutoCloseableSeqInternal totalSize:$totalSize, inner:[${ts.mkString(";")}]"
}
}

/**
Expand Down Expand Up @@ -454,14 +462,42 @@ object RmmRapidsRetryIterator extends Logging {
// there is likely not much we can do, and for now we don't handle
// this OOM
if (splitPolicy == null) {
val message = s"could not split inputs and retry. The current attempt: " +
s"{${attemptStack.head}}"
if (isFromGpuOom) {
throw new GpuSplitAndRetryOOM("GPU OutOfMemory: could not split inputs and retry")
throw new GpuSplitAndRetryOOM(s"GPU OutOfMemory: $message")
} else {
throw new CpuSplitAndRetryOOM("CPU OutOfMemory: could not split inputs and retry")
throw new CpuSplitAndRetryOOM(s"CPU OutOfMemory: $message")
}
}
// splitPolicy must take ownership of the argument
val splitted = splitPolicy(attemptStack.pop())
val curAttempt = attemptStack.pop()
// Get the info before running the split, since the attempt may be closed after splitting.
val attemptAsString = closeOnExcept(curAttempt)(_.toString)
val splitted = try {
// splitPolicy must take ownership of the argument
splitPolicy(curAttempt)
} catch {
// We only care about OOM exceptions and wrap it by a new exception with the
// same type to provide more context for the OOM.
// This looks a little odd, because we can not change the type of root exception.
// Otherwise, some unit tests will fail due to the wrong exception type returned.
case go: GpuRetryOOM =>
throw new GpuRetryOOM(
s"GPU OutOfMemory: Could not split the current attempt: {$attemptAsString}"
).initCause(go)
case go: GpuSplitAndRetryOOM =>
throw new GpuSplitAndRetryOOM(
s"GPU OutOfMemory: Could not split the current attempt: {$attemptAsString}"
).initCause(go)
case co: CpuRetryOOM =>
throw new CpuRetryOOM(
s"CPU OutOfMemory: Could not split the current attempt: {$attemptAsString}"
).initCause(co)
case co: CpuSplitAndRetryOOM =>
throw new CpuSplitAndRetryOOM(
s"CPU OutOfMemory: Could not split the current attempt: {$attemptAsString}"
).initCause(co)
}
// the splitted sequence needs to be inserted in reverse order
// so we try the first item first.
splitted.reverse.foreach(attemptStack.push)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ trait SpillableColumnarBatch extends AutoCloseable {
def sizeInBytes: Long

def dataTypes: Array[DataType]

override def toString: String =
s"SCB size:$sizeInBytes, types:${dataTypes.toList}, rows:${numRows()}"
}

/**
Expand All @@ -79,6 +82,8 @@ class JustRowsColumnarBatch(numRows: Int)

// There is no off heap data and close is a noop so just return this
override def incRefCount(): SpillableColumnarBatch = this

override def toString: String = s"JustRowsSCB size:$sizeInBytes, rows:$numRows"
}

/**
Expand Down Expand Up @@ -148,7 +153,8 @@ class SpillableColumnarBatchImpl (
}

override def toString: String =
s"SCB $handle $rowCount ${sparkTypes.toList} $refCount"
s"GpuSCB size:$sizeInBytes, handle:$handle, rows:$rowCount, types:${sparkTypes.toList}," +
s" refCount:$refCount"
}

class JustRowsHostColumnarBatch(numRows: Int)
Expand All @@ -167,6 +173,8 @@ class JustRowsHostColumnarBatch(numRows: Int)

// There is no off heap data and close is a noop so just return this
override def incRefCount(): SpillableColumnarBatch = this

override def toString: String = s"JustRowsHostSCB size:$sizeInBytes, rows:$numRows"
}

/**
Expand Down Expand Up @@ -233,6 +241,10 @@ class SpillableHostColumnarBatchImpl (
throw new IllegalStateException("Double free on SpillableHostColumnarBatchImpl")
}
}

override def toString: String =
s"HostSCB size:$sizeInBytes, handle:$handle, rows:$rowCount, types:${sparkTypes.toList}," +
s" refCount:$refCount"
}

object SpillableColumnarBatch {
Expand Down Expand Up @@ -388,6 +400,13 @@ class SpillableBuffer(
override def close(): Unit = {
handle.close()
}

override def toString: String = {
val size = withResource(RapidsBufferCatalog.acquireBuffer(handle)) { rapidsBuffer =>
rapidsBuffer.memoryUsedBytes
}
s"SpillableBuffer size:$size, handle:$handle"
}
}

/**
Expand Down Expand Up @@ -422,6 +441,9 @@ class SpillableHostBuffer(handle: RapidsBufferHandle,
rapidsBuffer.getHostMemoryBuffer
}
}

override def toString: String =
s"SpillableHostBuffer length:$length, handle:$handle"
}

object SpillableBuffer {
Expand Down

0 comments on commit 6cba00d

Please sign in to comment.