Skip to content

Commit

Permalink
Support for LeftOuter/BuildRight and RightOuter/BuildLeft nested loop…
Browse files Browse the repository at this point in the history
… joins (#3242)

Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Aug 20, 2021
1 parent d3ae0d0 commit 92a200e
Show file tree
Hide file tree
Showing 9 changed files with 663 additions and 424 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.BroadcastExchangeExec"></a>spark.rapids.sql.exec.BroadcastExchangeExec|The backend for broadcast exchange of data|true|None|
<a name="sql.exec.ShuffleExchangeExec"></a>spark.rapids.sql.exec.ShuffleExchangeExec|The backend for most data being exchanged between processes|true|None|
<a name="sql.exec.BroadcastHashJoinExec"></a>spark.rapids.sql.exec.BroadcastHashJoinExec|Implementation of join using broadcast data|true|None|
<a name="sql.exec.BroadcastNestedLoopJoinExec"></a>spark.rapids.sql.exec.BroadcastNestedLoopJoinExec|Implementation of join using brute force|true|None|
<a name="sql.exec.BroadcastNestedLoopJoinExec"></a>spark.rapids.sql.exec.BroadcastNestedLoopJoinExec|Implementation of join using brute force. Full outer joins and joins where the broadcast side matches the join side (e.g.: LeftOuter with left broadcast) are not supported. A non-inner join only is supported if the join condition expression can be converted to a GPU AST expression|true|None|
<a name="sql.exec.CartesianProductExec"></a>spark.rapids.sql.exec.CartesianProductExec|Implementation of join using brute force|true|None|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ Accelerator supports are described below.
</tr>
<tr>
<td rowspan="1">BroadcastNestedLoopJoinExec</td>
<td rowspan="1">Implementation of join using brute force</td>
<td rowspan="1">Implementation of join using brute force. Full outer joins and joins where the broadcast side matches the join side (e.g.: LeftOuter with left broadcast) are not supported. A non-inner join only is supported if the join condition expression can be converted to a GPU AST expression</td>
<td>Input</td>
<td>None</td>
<td>S</td>
Expand Down
122 changes: 93 additions & 29 deletions integration_tests/src/main/python/join_test.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.collection.mutable

import ai.rapids.cudf.{GatherMap, NvtxColor}
import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Base class for iterators producing the results of a join.
* @param gatherNvtxName name to use for the NVTX range when producing the join gather maps
* @param targetSize configured target batch size in bytes
* @param joinTime metric to record GPU time spent in join
* @param totalTime metric to record total time in the iterator
*/
abstract class AbstractGpuJoinIterator(
gatherNvtxName: String,
targetSize: Long,
joinTime: GpuMetric,
totalTime: GpuMetric) extends Iterator[ColumnarBatch] with Arm with AutoCloseable {
private[this] var nextCb: Option[ColumnarBatch] = None
private[this] var gathererStore: Option[JoinGatherer] = None

protected[this] var closed = false

TaskContext.get().addTaskCompletionListener[Unit](_ => close())

/** Returns whether there are any more batches on the stream side of the join */
protected def hasNextStreamBatch: Boolean

/**
* Called to setup the next join gatherer instance when the previous instance is done or
* there is no previous instance.
* @param startNanoTime system nanoseconds timestamp at the top of the iterator loop, useful for
* calculating the time spent producing the next stream batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
protected def setupNextGatherer(startNanoTime: Long): Option[JoinGatherer]

override def hasNext: Boolean = {
if (closed) {
return false
}
var mayContinue = true
while (nextCb.isEmpty && mayContinue) {
val startNanoTime = System.nanoTime()
if (gathererStore.exists(!_.isDone)) {
nextCb = nextCbFromGatherer()
} else if (hasNextStreamBatch) {
// Need to refill the gatherer
gathererStore.foreach(_.close())
gathererStore = None
gathererStore = setupNextGatherer(startNanoTime)
nextCb = nextCbFromGatherer()
} else {
mayContinue = false
}
totalTime += (System.nanoTime() - startNanoTime)
}
if (nextCb.isEmpty) {
// Nothing is left to return so close ASAP.
close()
}
nextCb.isDefined
}

override def next(): ColumnarBatch = {
if (!hasNext) {
throw new NoSuchElementException()
}
val ret = nextCb.get
nextCb = None
ret
}

override def close(): Unit = {
if (!closed) {
nextCb.foreach(_.close())
nextCb = None
gathererStore.foreach(_.close())
gathererStore = None
closed = true
}
}

private def nextCbFromGatherer(): Option[ColumnarBatch] = {
withResource(new NvtxWithMetrics(gatherNvtxName, NvtxColor.DARK_GREEN, joinTime)) { _ =>
val ret = gathererStore.map { gather =>
val nextRows = JoinGatherer.getRowsInNextBatch(gather, targetSize)
gather.gatherNext(nextRows)
}
if (gathererStore.exists(_.isDone)) {
gathererStore.foreach(_.close())
gathererStore = None
}

if (ret.isDefined) {
// We are about to return something. We got everything we need from it so now let it spill
// if there is more to be gathered later on.
gathererStore.foreach(_.allowSpilling())
}
ret
}
}
}

/**
* Base class for join iterators that split and spill batches to avoid GPU OOM errors.
* @param gatherNvtxName name to use for the NVTX range when producing the join gather maps
* @param stream iterator to produce the batches for the streaming side input of the join
* @param streamAttributes attributes corresponding to the streaming side input
* @param builtBatch batch for the built side input of the join
* @param targetSize configured target batch size in bytes
* @param spillCallback callback to use when spilling
* @param joinTime metric to record GPU time spent in join
* @param streamTime metric to record time spent producing streaming side batches
* @param totalTime metric to record total time in the iterator
*/
abstract class SplittableJoinIterator(
gatherNvtxName: String,
stream: Iterator[LazySpillableColumnarBatch],
streamAttributes: Seq[Attribute],
builtBatch: LazySpillableColumnarBatch,
targetSize: Long,
spillCallback: SpillCallback,
joinTime: GpuMetric,
streamTime: GpuMetric,
totalTime: GpuMetric)
extends AbstractGpuJoinIterator(
gatherNvtxName,
targetSize,
joinTime = joinTime,
totalTime = totalTime) with Logging {
// For some join types even if there is no stream data we might output something
private var isInitialJoin = true
// If the join explodes this holds batches from the stream side split into smaller pieces.
private val pendingSplits = mutable.Queue[SpillableColumnarBatch]()

protected def computeNumJoinRows(cb: ColumnarBatch): Long

/**
* Create a join gatherer.
* @param cb next column batch from the streaming side of the join
* @param numJoinRows if present, the number of join output rows computed for this batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
protected def createGatherer(cb: ColumnarBatch, numJoinRows: Option[Long]): Option[JoinGatherer]

override def hasNextStreamBatch: Boolean = {
isInitialJoin || pendingSplits.nonEmpty || stream.hasNext
}

override def setupNextGatherer(startNanoTime: Long): Option[JoinGatherer] = {
val wasInitialJoin = isInitialJoin
isInitialJoin = false
if (pendingSplits.nonEmpty || stream.hasNext) {
val cb = if (pendingSplits.nonEmpty) {
withResource(pendingSplits.dequeue()) {
_.getColumnarBatch()
}
} else {
val batch = withResource(stream.next()) { lazyBatch =>
lazyBatch.releaseBatch()
}
streamTime += (System.nanoTime() - startNanoTime)
batch
}
withResource(cb) { cb =>
val numJoinRows = computeNumJoinRows(cb)

// We want the gather maps size to be around the target size. There are two gather maps
// that are made up of ints, so compute how many rows on the stream side will produce the
// desired gather maps size.
val maxJoinRows = Math.max(1, targetSize / (2 * Integer.BYTES))
if (numJoinRows > maxJoinRows && cb.numRows() > 1) {
// Need to split the batch to reduce the gather maps size. This takes a simplistic
// approach of assuming the data is uniformly distributed in the stream table.
val numSplits = Math.min(cb.numRows(),
Math.ceil(numJoinRows.toDouble / maxJoinRows).toInt)
splitAndSave(cb, numSplits)

// Return no gatherer so the outer loop will try again
return None
}

createGatherer(cb, Some(numJoinRows))
}
} else {
assert(wasInitialJoin)
import scala.collection.JavaConverters._
withResource(GpuColumnVector.emptyBatch(streamAttributes.asJava)) { cb =>
createGatherer(cb, None)
}
}
}

override def close(): Unit = {
if (!closed) {
super.close()
builtBatch.close()
pendingSplits.foreach(_.close())
pendingSplits.clear()
}
}

/**
* Split a stream-side input batch, making all splits spillable, and replacing this batch with
* the splits in the stream-side input
* @param cb stream-side input batch to split
* @param numBatches number of splits to produce with approximately the same number of rows each
* @param oom a prior OOM exception that this will try to recover from by splitting
*/
protected def splitAndSave(
cb: ColumnarBatch,
numBatches: Int,
oom: Option[OutOfMemoryError] = None): Unit = {
val batchSize = cb.numRows() / numBatches
if (oom.isDefined && batchSize < 100) {
// We just need some kind of cutoff to not get stuck in a loop if the batches get to be too
// small but we want to at least give it a chance to work (mostly for tests where the
// targetSize can be set really small)
throw oom.get
}
val msg = s"Split stream batch into $numBatches batches of about $batchSize rows"
if (oom.isDefined) {
logWarning(s"OOM Encountered: $msg")
} else {
logInfo(msg)
}
val splits = withResource(GpuColumnVector.from(cb)) { tab =>
val splitIndexes = (1 until numBatches).map(num => num * batchSize)
tab.contiguousSplit(splitIndexes: _*)
}
withResource(splits) { splits =>
val schema = GpuColumnVector.extractTypes(cb)
pendingSplits ++= splits.map { ct =>
SpillableColumnarBatch(ct, schema,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCallback)
}
}
}

/**
* Create a join gatherer from gather maps.
* @param maps gather maps produced from a cudf join
* @param leftData batch corresponding to the left table in the join
* @param rightData batch corresponding to the right table in the join
* @return some gatherer or None if the are no rows to gather in this join batch
*/
protected def makeGatherer(
maps: Array[GatherMap],
leftData: LazySpillableColumnarBatch,
rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
assert(maps.length > 0 && maps.length <= 2)
try {
val leftMap = maps.head
val rightMap = if (maps.length > 1) {
if (rightData.numCols == 0) {
// No data so don't bother with it
None
} else {
Some(maps(1))
}
} else {
None
}

val lazyLeftMap = LazySpillableGatherMap(leftMap, spillCallback, "left_map")
val gatherer = rightMap match {
case None =>
rightData.close()
JoinGatherer(lazyLeftMap, leftData)
case Some(right) =>
val lazyRightMap = LazySpillableGatherMap(right, spillCallback, "right_map")
JoinGatherer(lazyLeftMap, leftData, lazyRightMap, rightData)
}
if (gatherer.isDone) {
// Nothing matched...
gatherer.close()
None
} else {
Some(gatherer)
}
} finally {
maps.foreach(_.close())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3082,7 +3082,10 @@ object GpuOverrides {
TypeSig.DECIMAL_64 + TypeSig.STRUCT), TypeSig.all),
(exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
"Implementation of join using brute force. Full outer joins and joins where the " +
"broadcast side matches the join side (e.g.: LeftOuter with left broadcast) are not " +
"supported. A non-inner join only is supported if the join condition expression can " +
"be converted to a GPU AST expression",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64),
TypeSig.all),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ trait LazySpillableColumnarBatch extends LazySpillable {
* Get the batch that this wraps and unspill it if needed.
*/
def getBatch: ColumnarBatch

/**
* Release the underlying batch to the caller who is responsible for closing it. The resulting
* batch will NOT be closed when this instance is closed.
*/
def releaseBatch(): ColumnarBatch
}

object LazySpillableColumnarBatch {
Expand All @@ -218,13 +224,20 @@ object LazySpillableColumnarBatch {
/**
* A version of `LazySpillableColumnarBatch` where instead of closing the underlying
* batch it is only spilled. This is used for cases, like with a streaming hash join
* where the data itself needs to out live the JoinGatherer it is haded off to.
* where the data itself needs to out live the JoinGatherer it is handed off to.
*/
case class AllowSpillOnlyLazySpillableColumnarBatchImpl(wrapped: LazySpillableColumnarBatch)
extends LazySpillableColumnarBatch {
extends LazySpillableColumnarBatch with Arm {
override def getBatch: ColumnarBatch =
wrapped.getBatch

override def releaseBatch(): ColumnarBatch = {
closeOnExcept(GpuColumnVector.incRefCounts(wrapped.getBatch)) { batch =>
wrapped.allowSpilling()
batch
}
}

override def numRows: Int = wrapped.numRows
override def numCols: Int = wrapped.numCols
override def deviceMemorySize: Long = wrapped.deviceMemorySize
Expand Down Expand Up @@ -262,7 +275,15 @@ class LazySpillableColumnarBatchImpl(
cached = spill.map(_.getColumnarBatch())
}
}
cached.get
cached.getOrElse(throw new IllegalStateException("batch is closed"))
}

override def releaseBatch(): ColumnarBatch = {
closeOnExcept(getBatch) { batch =>
cached = None
close()
batch
}
}

override def allowSpilling(): Unit = {
Expand Down
Loading

0 comments on commit 92a200e

Please sign in to comment.