diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index b7fea71d3ef..cc1196d44e4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -690,6 +690,11 @@ case class BatchesToCoalesce(batches: Array[SpillableColumnarBatch]) override def close(): Unit = { batches.safeClose() } + + override def toString: String = { + val totalSize = batches.map(_.sizeInBytes).sum + s"BatchesToCoalesce totalSize:$totalSize, batches:[${batches.mkString(";")}]" + } } class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index cf83c5b1264..239b7a3d4c0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -953,6 +953,9 @@ class BatchToGenerate(val fixUpOffset: Long, val spillable: SpillableColumnarBat override def close(): Unit = { spillable.close() } + + override def toString: String = + s"BatchToGenerate fixUpOffset:$fixUpOffset, spillable:$spillable" } class GpuGenerateIterator( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala index d86aa596325..04bc56af0c4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -311,6 +311,14 @@ object RmmRapidsRetryIterator extends Logging { override def iterator: Iterator[T] = ts.iterator override def apply(idx: Int): T = ts.apply(idx) + + override def toString(): String = { + val totalSize = ts.map { + case scb: SpillableColumnarBatch => scb.sizeInBytes + case _ => 0L + }.sum + s"AutoCloseableSeqInternal totalSize:$totalSize, inner:[${ts.mkString(";")}]" + } } /** @@ -454,14 +462,42 @@ object RmmRapidsRetryIterator extends Logging { // there is likely not much we can do, and for now we don't handle // this OOM if (splitPolicy == null) { + val message = s"could not split inputs and retry. The current attempt: " + + s"{${attemptStack.head}}" if (isFromGpuOom) { - throw new GpuSplitAndRetryOOM("GPU OutOfMemory: could not split inputs and retry") + throw new GpuSplitAndRetryOOM(s"GPU OutOfMemory: $message") } else { - throw new CpuSplitAndRetryOOM("CPU OutOfMemory: could not split inputs and retry") + throw new CpuSplitAndRetryOOM(s"CPU OutOfMemory: $message") } } - // splitPolicy must take ownership of the argument - val splitted = splitPolicy(attemptStack.pop()) + val curAttempt = attemptStack.pop() + // Get the info before running the split, since the attempt may be closed after splitting. + val attemptAsString = closeOnExcept(curAttempt)(_.toString) + val splitted = try { + // splitPolicy must take ownership of the argument + splitPolicy(curAttempt) + } catch { + // We only care about OOM exceptions and wrap it by a new exception with the + // same type to provide more context for the OOM. + // This looks a little odd, because we can not change the type of root exception. + // Otherwise, some unit tests will fail due to the wrong exception type returned. + case go: GpuRetryOOM => + throw new GpuRetryOOM( + s"GPU OutOfMemory: Could not split the current attempt: {$attemptAsString}" + ).initCause(go) + case go: GpuSplitAndRetryOOM => + throw new GpuSplitAndRetryOOM( + s"GPU OutOfMemory: Could not split the current attempt: {$attemptAsString}" + ).initCause(go) + case co: CpuRetryOOM => + throw new CpuRetryOOM( + s"CPU OutOfMemory: Could not split the current attempt: {$attemptAsString}" + ).initCause(co) + case co: CpuSplitAndRetryOOM => + throw new CpuSplitAndRetryOOM( + s"CPU OutOfMemory: Could not split the current attempt: {$attemptAsString}" + ).initCause(co) + } // the splitted sequence needs to be inserted in reverse order // so we try the first item first. splitted.reverse.foreach(attemptStack.push) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index d5216cbda9f..e1f45c34180 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -55,6 +55,9 @@ trait SpillableColumnarBatch extends AutoCloseable { def sizeInBytes: Long def dataTypes: Array[DataType] + + override def toString: String = + s"SCB size:$sizeInBytes, types:${dataTypes.toList}, rows:${numRows()}" } /** @@ -79,6 +82,8 @@ class JustRowsColumnarBatch(numRows: Int) // There is no off heap data and close is a noop so just return this override def incRefCount(): SpillableColumnarBatch = this + + override def toString: String = s"JustRowsSCB size:$sizeInBytes, rows:$numRows" } /** @@ -148,7 +153,8 @@ class SpillableColumnarBatchImpl ( } override def toString: String = - s"SCB $handle $rowCount ${sparkTypes.toList} $refCount" + s"GpuSCB size:$sizeInBytes, handle:$handle, rows:$rowCount, types:${sparkTypes.toList}," + + s" refCount:$refCount" } class JustRowsHostColumnarBatch(numRows: Int) @@ -167,6 +173,8 @@ class JustRowsHostColumnarBatch(numRows: Int) // There is no off heap data and close is a noop so just return this override def incRefCount(): SpillableColumnarBatch = this + + override def toString: String = s"JustRowsHostSCB size:$sizeInBytes, rows:$numRows" } /** @@ -233,6 +241,10 @@ class SpillableHostColumnarBatchImpl ( throw new IllegalStateException("Double free on SpillableHostColumnarBatchImpl") } } + + override def toString: String = + s"HostSCB size:$sizeInBytes, handle:$handle, rows:$rowCount, types:${sparkTypes.toList}," + + s" refCount:$refCount" } object SpillableColumnarBatch { @@ -388,6 +400,13 @@ class SpillableBuffer( override def close(): Unit = { handle.close() } + + override def toString: String = { + val size = withResource(RapidsBufferCatalog.acquireBuffer(handle)) { rapidsBuffer => + rapidsBuffer.memoryUsedBytes + } + s"SpillableBuffer size:$size, handle:$handle" + } } /** @@ -422,6 +441,9 @@ class SpillableHostBuffer(handle: RapidsBufferHandle, rapidsBuffer.getHostMemoryBuffer } } + + override def toString: String = + s"SpillableHostBuffer length:$length, handle:$handle" } object SpillableBuffer {