diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index a1ec04743e6..5050c97b96f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -184,12 +184,22 @@ class AcceleratedColumnarToRowIterator( } } +/** + * ColumnarToRowIterator converts GPU ColumnarBatches to CPU InternalRows. + * + * @note releaseSemaphore = true (default) should only be used in cases where + * we are sure that no GPU memory is left unaccounted for (not spillable). + * One notable case where releaseSemaphore is false is when used in + * `GpuUserDefinedFunction`, which is evaluated as part of a projection, that + * may or may not include other GPU columns. + */ class ColumnarToRowIterator(batches: Iterator[ColumnarBatch], numInputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric, streamTime: GpuMetric, - nullSafe: Boolean = false) extends Iterator[InternalRow] with Arm { + nullSafe: Boolean = false, + releaseSemaphore: Boolean = true) extends Iterator[InternalRow] with Arm { // GPU batches read in must be closed by the receiver (us) @transient private var cb: ColumnarBatch = null private var it: java.util.Iterator[InternalRow] = null @@ -227,8 +237,11 @@ class ColumnarToRowIterator(batches: Iterator[ColumnarBatch], numOutputRows += cb.numRows() } finally { devCb.close() - // Leaving the GPU for a while - GpuSemaphore.releaseIfNecessary(TaskContext.get()) + // Leaving the GPU for a while: if this iterator is configured to release + // the semaphore, do it now. + if (releaseSemaphore) { + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala index 32c3d76d76e..ac3afc137dc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -133,7 +133,10 @@ trait GpuRowBasedUserDefinedFunction extends GpuExpression NoopMetric, NoopMetric, NoopMetric, - nullSafe).foreach { row => + nullSafe, + // ensure `releaseSemaphore` is false so we don't release the semaphore + // mid projection. + releaseSemaphore = false).foreach { row => retRow.update(0, evaluateRow(row)) retConverter.append(retRow, 0, builder) }