Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize semaphore acquisition in GpuShuffledHashJoinExec #4588

Merged
merged 14 commits into from
Feb 8, 2022
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.rapids.cudf

import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.{Arm, GpuColumnVectorFromBuffer}

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

object HostConcatResultUtil extends Arm {
/**
* Create a rows-only `HostConcatResult`.
*/
def rowsOnlyHostConcatResult(numRows: Int): HostConcatResult = {
new HostConcatResult(
new JCudfSerialization.SerializedTableHeader(
Array.empty, numRows, 0L),
HostMemoryBuffer.allocate(0, false))
}

/**
* Given a `HostConcatResult` and a SparkSchema produce a `ColumnarBatch`,
* handling the rows-only case.
*
* @note This function does not consume the `HostConcatResult`, and
* callers are responsible for closing the resulting `ColumnarBatch`
*/
def getColumnarBatch(
hostConcatResult: HostConcatResult,
sparkSchema: Array[DataType]): ColumnarBatch = {
if (hostConcatResult.getTableHeader.getNumColumns == 0) {
// We expect the caller to have acquired the GPU unconditionally before calling
// `getColumnarBatch`, as a downstream exec may need the GPU, and the assumption is
// that it is acquired in the coalesce code.
new ColumnarBatch(Array.empty, hostConcatResult.getTableHeader.getNumRows)
} else {
withResource(hostConcatResult.toContiguousTable) { ct =>
GpuColumnVectorFromBuffer.from(ct, sparkSchema)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,15 +46,28 @@ object ConcatAndConsumeAll {
* @return a single batch with all of them concated together.
revans2 marked this conversation as resolved.
Show resolved Hide resolved
jlowe marked this conversation as resolved.
Show resolved Hide resolved
*/
def buildNonEmptyBatch(arrayOfBatches: Array[ColumnarBatch],
schema: StructType): ColumnarBatch = {
schema: StructType): ColumnarBatch =
buildNonEmptyBatchFromTypes(
arrayOfBatches, GpuColumnVector.extractTypes(schema))

/**
* Build a single batch from the batches collected so far. If array is empty this will likely
* blow up.
* @param arrayOfBatches the batches to concat. This will be consumed and you do not need to
* close any of the batches after this is called.
* @param dataTypes the output types.
* @return a single batch with all of them concated together.
*/
def buildNonEmptyBatchFromTypes(arrayOfBatches: Array[ColumnarBatch],
dataTypes: Array[DataType]): ColumnarBatch = {
if (arrayOfBatches.length == 1) {
arrayOfBatches(0)
} else {
val tables = arrayOfBatches.map(GpuColumnVector.from)
try {
val combined = Table.concatenate(tables: _*)
try {
GpuColumnVector.from(combined, GpuColumnVector.extractTypes(schema))
GpuColumnVector.from(combined, dataTypes)
} finally {
combined.close()
}
Expand Down Expand Up @@ -410,9 +423,8 @@ abstract class AbstractGpuCoalesceIterator(
}

class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
schema: StructType,
sparkTypes: Array[DataType],
goal: CoalesceSizeGoal,
maxDecompressBatchMemory: Long,
numInputRows: GpuMetric,
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
Expand All @@ -422,8 +434,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
opTime: GpuMetric,
peakDevMemory: GpuMetric,
spillCallback: SpillCallback,
opName: String,
codecConfigs: TableCompressionCodecConfig)
opName: String)
extends AbstractGpuCoalesceIterator(iter,
goal,
numInputRows,
Expand All @@ -435,8 +446,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
opTime,
opName) with Arm {

private val sparkTypes: Array[DataType] = GpuColumnVector.extractTypes(schema)
private val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty
protected val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty
private var maxDeviceMemory: Long = 0

override def initNewBatch(batch: ColumnarBatch): Unit = {
Expand All @@ -448,9 +458,84 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY,
spillCallback))

protected def popAll(): Array[ColumnarBatch] = {
closeOnExcept(batches.map(_.getColumnarBatch())) { wip =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does this need to be a safeMap?

batches.safeClose()
batches.clear()
wip.toArray
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
val ret = ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(popAll(), sparkTypes)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
}

override def cleanupConcatIsDone(): Unit = {
peakDevMemory.set(maxDeviceMemory)
batches.clear()
}

private var onDeck: Option[SpillableColumnarBatch] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
}

override protected def clearOnDeck(): Unit = {
onDeck.foreach(_.close())
onDeck = None
}

override protected def popOnDeck(): ColumnarBatch = {
val ret = onDeck.get.getColumnarBatch()
clearOnDeck()
ret
}
}

/**
* Compression codec-aware `GpuCoalesceIterator` subclass which should be used in cases
* where the RAPIDS Shuffle Manager could be configured, as batches to be coalesced
* may be compressed.
*/
class GpuCompressionAwareCoalesceIterator(
iter: Iterator[ColumnarBatch],
sparkTypes: Array[DataType],
goal: CoalesceSizeGoal,
maxDecompressBatchMemory: Long,
numInputRows: GpuMetric,
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
collectTime: GpuMetric,
concatTime: GpuMetric,
opTime: GpuMetric,
peakDevMemory: GpuMetric,
spillCallback: SpillCallback,
opName: String,
codecConfigs: TableCompressionCodecConfig)
extends GpuCoalesceIterator(
iter, sparkTypes, goal,
numInputRows = numInputRows,
numInputBatches = numInputBatches,
numOutputRows = numOutputRows,
numOutputBatches = numOutputBatches,
collectTime = collectTime,
concatTime = concatTime,
opTime = opTime,
peakDevMemory = peakDevMemory,
spillCallback, opName) {

private[this] var codec: TableCompressionCodec = _

private[this] def popAllDecompressed(): Array[ColumnarBatch] = {
override protected def popAll(): Array[ColumnarBatch] = {
closeOnExcept(batches.map(_.getColumnarBatch())) { wip =>
batches.safeClose()
batches.clear()
Expand Down Expand Up @@ -490,39 +575,6 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
wip.toArray
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(popAllDecompressed(), schema)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
}

override def cleanupConcatIsDone(): Unit = {
peakDevMemory.set(maxDeviceMemory)
batches.clear()
}

private var onDeck: Option[SpillableColumnarBatch] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
}

override protected def clearOnDeck(): Unit = {
onDeck.foreach(_.close())
onDeck = None
}

override protected def popOnDeck(): ColumnarBatch = {
val ret = onDeck.get.getColumnarBatch()
clearOnDeck()
ret
}
}

case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
Expand Down Expand Up @@ -579,6 +631,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)

// cache in local vars to avoid serializing the plan
val outputSchema = schema
val dataTypes = GpuColumnVector.extractTypes(outputSchema)
val decompressMemoryTarget = maxDecompressBatchMemory

val batches = child.executeColumnar()
Expand All @@ -593,7 +646,8 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
goal match {
case sizeGoal: CoalesceSizeGoal =>
batches.mapPartitions { iter =>
new GpuCoalesceIterator(iter, outputSchema, sizeGoal, decompressMemoryTarget,
new GpuCompressionAwareCoalesceIterator(
iter, dataTypes, sizeGoal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, NoopMetric,
concatTime, opTime, peakDevMemory, callback, "GpuCoalesceBatches",
codecConfigs)
Expand Down
Loading