Skip to content

Commit

Permalink
Optimize semaphore acquisition in GpuShuffledHashJoinExec (#4588)
Browse files Browse the repository at this point in the history
* Optimize semaphore acquisition in GpuShuffledHashJoinExec

Signed-off-by: Alessandro Bellina <[email protected]>

* Update buildTime in the non-optimal case as well

* Refactor for ease of understanding and address review concerns

* Fix review nit

* Remove extra comment

* Update for review comments

* Add utility to work with rows-only HostConcatResult objects

* Apply review suggestions to HostConcatResultUtil and
GpuShuffledHashJoinExec

* Update copyrights

* Revert copyright change

* Fix unit tests

* Apply review comment

* Use arrays with safeMap
  • Loading branch information
abellina authored Feb 8, 2022
1 parent 0d4c496 commit be0f6f7
Show file tree
Hide file tree
Showing 9 changed files with 851 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.rapids.cudf

import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.{Arm, GpuColumnVectorFromBuffer}

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

object HostConcatResultUtil extends Arm {
/**
* Create a rows-only `HostConcatResult`.
*/
def rowsOnlyHostConcatResult(numRows: Int): HostConcatResult = {
new HostConcatResult(
new JCudfSerialization.SerializedTableHeader(
Array.empty, numRows, 0L),
HostMemoryBuffer.allocate(0, false))
}

/**
* Given a `HostConcatResult` and a SparkSchema produce a `ColumnarBatch`,
* handling the rows-only case.
*
* @note This function does not consume the `HostConcatResult`, and
* callers are responsible for closing the resulting `ColumnarBatch`
*/
def getColumnarBatch(
hostConcatResult: HostConcatResult,
sparkSchema: Array[DataType]): ColumnarBatch = {
if (hostConcatResult.getTableHeader.getNumColumns == 0) {
// We expect the caller to have acquired the GPU unconditionally before calling
// `getColumnarBatch`, as a downstream exec may need the GPU, and the assumption is
// that it is acquired in the coalesce code.
new ColumnarBatch(Array.empty, hostConcatResult.getTableHeader.getNumRows)
} else {
withResource(hostConcatResult.toContiguousTable) { ct =>
GpuColumnVectorFromBuffer.from(ct, sparkSchema)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,15 +46,28 @@ object ConcatAndConsumeAll {
* @return a single batch with all of them concated together.
*/
def buildNonEmptyBatch(arrayOfBatches: Array[ColumnarBatch],
schema: StructType): ColumnarBatch = {
schema: StructType): ColumnarBatch =
buildNonEmptyBatchFromTypes(
arrayOfBatches, GpuColumnVector.extractTypes(schema))

/**
* Build a single batch from the batches collected so far. If array is empty this will likely
* blow up.
* @param arrayOfBatches the batches to concat. This will be consumed and you do not need to
* close any of the batches after this is called.
* @param dataTypes the output types.
* @return a single batch with all of them concated together.
*/
def buildNonEmptyBatchFromTypes(arrayOfBatches: Array[ColumnarBatch],
dataTypes: Array[DataType]): ColumnarBatch = {
if (arrayOfBatches.length == 1) {
arrayOfBatches(0)
} else {
val tables = arrayOfBatches.map(GpuColumnVector.from)
try {
val combined = Table.concatenate(tables: _*)
try {
GpuColumnVector.from(combined, GpuColumnVector.extractTypes(schema))
GpuColumnVector.from(combined, dataTypes)
} finally {
combined.close()
}
Expand Down Expand Up @@ -410,9 +423,8 @@ abstract class AbstractGpuCoalesceIterator(
}

class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
schema: StructType,
sparkTypes: Array[DataType],
goal: CoalesceSizeGoal,
maxDecompressBatchMemory: Long,
numInputRows: GpuMetric,
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
Expand All @@ -422,8 +434,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
opTime: GpuMetric,
peakDevMemory: GpuMetric,
spillCallback: SpillCallback,
opName: String,
codecConfigs: TableCompressionCodecConfig)
opName: String)
extends AbstractGpuCoalesceIterator(iter,
goal,
numInputRows,
Expand All @@ -435,8 +446,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
opTime,
opName) with Arm {

private val sparkTypes: Array[DataType] = GpuColumnVector.extractTypes(schema)
private val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty
protected val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty
private var maxDeviceMemory: Long = 0

override def initNewBatch(batch: ColumnarBatch): Unit = {
Expand All @@ -448,10 +458,85 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY,
spillCallback))

protected def popAll(): Array[ColumnarBatch] = {
closeOnExcept(batches.toArray.safeMap(_.getColumnarBatch())) { wip =>
batches.safeClose()
batches.clear()
wip
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
val ret = ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(popAll(), sparkTypes)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
}

override def cleanupConcatIsDone(): Unit = {
peakDevMemory.set(maxDeviceMemory)
batches.clear()
}

private var onDeck: Option[SpillableColumnarBatch] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
}

override protected def clearOnDeck(): Unit = {
onDeck.foreach(_.close())
onDeck = None
}

override protected def popOnDeck(): ColumnarBatch = {
val ret = onDeck.get.getColumnarBatch()
clearOnDeck()
ret
}
}

/**
* Compression codec-aware `GpuCoalesceIterator` subclass which should be used in cases
* where the RAPIDS Shuffle Manager could be configured, as batches to be coalesced
* may be compressed.
*/
class GpuCompressionAwareCoalesceIterator(
iter: Iterator[ColumnarBatch],
sparkTypes: Array[DataType],
goal: CoalesceSizeGoal,
maxDecompressBatchMemory: Long,
numInputRows: GpuMetric,
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
collectTime: GpuMetric,
concatTime: GpuMetric,
opTime: GpuMetric,
peakDevMemory: GpuMetric,
spillCallback: SpillCallback,
opName: String,
codecConfigs: TableCompressionCodecConfig)
extends GpuCoalesceIterator(
iter, sparkTypes, goal,
numInputRows = numInputRows,
numInputBatches = numInputBatches,
numOutputRows = numOutputRows,
numOutputBatches = numOutputBatches,
collectTime = collectTime,
concatTime = concatTime,
opTime = opTime,
peakDevMemory = peakDevMemory,
spillCallback, opName) {

private[this] var codec: TableCompressionCodec = _

private[this] def popAllDecompressed(): Array[ColumnarBatch] = {
closeOnExcept(batches.map(_.getColumnarBatch())) { wip =>
override protected def popAll(): Array[ColumnarBatch] = {
closeOnExcept(batches.toArray.safeMap(_.getColumnarBatch())) { wip =>
batches.safeClose()
batches.clear()

Expand Down Expand Up @@ -487,42 +572,9 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
}
}
}
wip.toArray
wip
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(popAllDecompressed(), schema)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
}

override def cleanupConcatIsDone(): Unit = {
peakDevMemory.set(maxDeviceMemory)
batches.clear()
}

private var onDeck: Option[SpillableColumnarBatch] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
}

override protected def clearOnDeck(): Unit = {
onDeck.foreach(_.close())
onDeck = None
}

override protected def popOnDeck(): ColumnarBatch = {
val ret = onDeck.get.getColumnarBatch()
clearOnDeck()
ret
}
}

case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
Expand Down Expand Up @@ -579,6 +631,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)

// cache in local vars to avoid serializing the plan
val outputSchema = schema
val dataTypes = GpuColumnVector.extractTypes(outputSchema)
val decompressMemoryTarget = maxDecompressBatchMemory

val batches = child.executeColumnar()
Expand All @@ -593,7 +646,8 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
goal match {
case sizeGoal: CoalesceSizeGoal =>
batches.mapPartitions { iter =>
new GpuCoalesceIterator(iter, outputSchema, sizeGoal, decompressMemoryTarget,
new GpuCompressionAwareCoalesceIterator(
iter, dataTypes, sizeGoal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, NoopMetric,
concatTime, opTime, peakDevMemory, callback, "GpuCoalesceBatches",
codecConfigs)
Expand Down
Loading

0 comments on commit be0f6f7

Please sign in to comment.