Skip to content

Commit

Permalink
Refactor full join iterator to allow access to build tracker (#10246)
Browse files Browse the repository at this point in the history
* Refactor full join iterator to allow access to build tracker

Signed-off-by: Jason Lowe <[email protected]>

* Add comments and close stream iterator earlier

* Allow existing build side tracker to be specified

---------

Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Jan 24, 2024
1 parent 171ead8 commit 35f64fc
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,8 @@ abstract class AbstractGpuJoinIterator(
*/
protected def setupNextGatherer(): Option[JoinGatherer]

protected def getFinalBatch(): Option[ColumnarBatch] = None
/** Whether to automatically call close() on this iterator when it is exhausted. */
protected val shouldAutoCloseOnExhaust: Boolean = true

override def hasNext: Boolean = {
if (closed) {
Expand Down Expand Up @@ -107,12 +108,9 @@ abstract class AbstractGpuJoinIterator(
}
}
}
if (nextCb.isEmpty) {
nextCb = getFinalBatch()
if (nextCb.isEmpty) {
// Nothing is left to return so close ASAP.
opTime.ns(close())
}
if (nextCb.isEmpty && shouldAutoCloseOnExhaust) {
// Nothing is left to return so close ASAP.
opTime.ns(close())
}
nextCb.isDefined
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -557,19 +557,37 @@ class ConditionalHashJoinIterator(
}
}


/**
* An iterator that does a hash full join against a stream of batches. It does this by
* doing a left or right outer join and keeping track of the hits on the build side. It then
* produces a final batch of all the build side rows that were not already included.
* An iterator that does the stream-side only of a hash full join In other words, it performs the
* left or right outer join for the stream side's view of a full outer join. As the join is
* performed, the build-side rows that are referenced during the join are tracked and can be
* retrieved after the iteration has completed to assist in performing the anti-join needed to
* produce the final results needed for the full outer join.
*
* @param built spillable form of the build side table. This will be closed by the iterator.
* @param boundBuiltKeys bound expressions for the build side equi-join keys
* @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
* closed by the iterator.
* @param stream iterator to produce batches for the stream side table
* @param boundStreamKeys bound expressions for the stream side equi-join keys
* @param streamAttributes schema of the stream side table
* @param compiledCondition compiled AST expression for the inequality condition of the join,
* if any. NOTE: This will *not* be closed by the iterator.
* @param targetSize target GPU batch size in bytes
* @param buildSide which side of the join is being used for the build side
* @param compareNullsEqual whether to compare nulls as equal during the join
* @param opTime metric to update for total operation time
* @param joinTime metric to update for join time
*/
class HashFullJoinIterator(
class HashFullJoinStreamSideIterator(
built: LazySpillableColumnarBatch,
boundBuiltKeys: Seq[Expression],
buildSideTrackerInit: Option[SpillableColumnarBatch],
stream: Iterator[LazySpillableColumnarBatch],
boundStreamKeys: Seq[Expression],
streamAttributes: Seq[Attribute],
boundCondition: Option[GpuExpression],
numFirstConditionTableColumns: Int,
compiledCondition: Option[CompiledExpression],
targetSize: Long,
buildSide: GpuBuildSide,
compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs
Expand All @@ -588,18 +606,13 @@ class HashFullJoinIterator(
joinTime = joinTime) {
// Full Join is implemented via LeftOuter or RightOuter join, depending on the build side.
private val useLeftOuterJoin = (buildSide == GpuBuildRight)
private val numBuiltRows = built.numRows

private[this] var builtSideTracker : Option[SpillableColumnarBatch] = None

private val nullEquality = if (compareNullsEqual) NullEquality.EQUAL else NullEquality.UNEQUAL

private val compiledConditionRes: Option[CompiledExpression] = boundCondition.map { gpuExpr =>
use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile()))
}
private[this] var builtSideTracker: Option[SpillableColumnarBatch] = buildSideTrackerInit

private def unconditionalLeftJoinGatherMaps(
leftKeys: Table, rightKeys: Table) : Array[GatherMap] = {
leftKeys: Table, rightKeys: Table): Array[GatherMap] = {
if (useLeftOuterJoin) {
leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
} else {
Expand All @@ -614,7 +627,7 @@ class HashFullJoinIterator(
leftData: LazySpillableColumnarBatch,
rightKeys: Table,
rightData: LazySpillableColumnarBatch,
compiledCondition: CompiledExpression) : Array[GatherMap] = {
compiledCondition: CompiledExpression): Array[GatherMap] = {
withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable =>
withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable =>
if (useLeftOuterJoin) {
Expand All @@ -637,8 +650,8 @@ class HashFullJoinIterator(
rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
withResource(new NvtxWithMetrics("full hash join gather map",
NvtxColor.ORANGE, joinTime)) { _ =>
val maps = compiledConditionRes.map { compiledCondition =>
conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, compiledCondition)
val maps = compiledCondition.map { condition =>
conditionalLeftJoinGatherMaps(leftKeys, leftData, rightKeys, rightData, condition)
}.getOrElse {
unconditionalLeftJoinGatherMaps(leftKeys, rightKeys)
}
Expand Down Expand Up @@ -674,55 +687,25 @@ class HashFullJoinIterator(
}
}

// Need to avoid close on exhaust so others can access the built side tracker after iteration.
override protected val shouldAutoCloseOnExhaust: Boolean = false

/**
* Retrieve the tracking data for the build side rows that have been referenced during the
* join. This is normally called after iteration has completed. The caller takes ownership
* of the resulting data and is responsible for closing it.
*/
def releaseBuiltSideTracker(): Option[SpillableColumnarBatch] = {
val result = builtSideTracker
builtSideTracker = None
result
}

override def close(): Unit = {
if (!closed) {
super.close()
compiledConditionRes.foreach(_.close())
builtSideTracker.foreach(_.close())
}
}

override def getFinalBatch(): Option[ColumnarBatch] = {
withResource(new NvtxWithMetrics("get final batch",
NvtxColor.ORANGE, joinTime)) { _ =>
builtSideTracker match {
case None => None
case Some(tracker) => {
val filteredBatch = withResource(tracker) { scb =>
withResource(scb.getColumnarBatch()) { trackerBatch =>
withResource(GpuColumnVector.from(trackerBatch)) { trackerTab =>
val batch = built.getBatch
withResource(GpuColumnVector.from(batch)) { builtTable =>
withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab =>
GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch))
}
}
}
}
}
// Combine build-side columns with null columns for stream side
val ret = withResource(filteredBatch) { builtBatch =>
val numFilterRows = builtBatch.numRows()
if (numFilterRows > 0) {
val streamColumns = streamAttributes.safeMap { attr =>
GpuColumnVector.fromNull(numFilterRows, attr.dataType)
}
withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch =>
buildSide match {
case GpuBuildRight =>
Some(GpuColumnVector.combineColumns(streamBatch, builtBatch))
case GpuBuildLeft =>
Some(GpuColumnVector.combineColumns(builtBatch, streamBatch))
}
}
} else {
None
}
}
builtSideTracker = None
ret
}
}
builtSideTracker = None
}
}

Expand Down Expand Up @@ -768,7 +751,7 @@ class HashFullJoinIterator(
}
}
}.getOrElse {
trueColumnTable(numBuiltRows)
trueColumnTable(built.numRows)
}
withResource(builtTrackingTable) { trackingTable =>
withResource(Scalar.fromBool(false)) { falseScalar =>
Expand All @@ -787,6 +770,125 @@ class HashFullJoinIterator(
}
}

/**
* An iterator that does a hash full join against a stream of batches. It does this by
* doing a left or right outer join and keeping track of the hits on the build side. It then
* produces a final batch of all the build side rows that were not already included.
*
* @param built spillable form of the build side table. This will be closed by the iterator.
* @param boundBuiltKeys bound expressions for the build side equi-join keys
* @param buildSideTrackerInit initial value of the build side row tracker, if any. This will be
* closed by the iterator.
* @param stream iterator to produce batches for the stream side table
* @param boundStreamKeys bound expressions for the stream side equi-join keys
* @param streamAttributes schema of the stream side table
* @param boundCondition expression for the inequality condition of the join, if any
* @param targetSize target GPU batch size in bytes
* @param buildSide which side of the join is being used for the build side
* @param compareNullsEqual whether to compare nulls as equal during the join
* @param opTime metric to update for total operation time
* @param joinTime metric to update for join time
*/
class HashFullJoinIterator(
built: LazySpillableColumnarBatch,
boundBuiltKeys: Seq[Expression],
buildSideTrackerInit: Option[SpillableColumnarBatch],
stream: Iterator[LazySpillableColumnarBatch],
boundStreamKeys: Seq[Expression],
streamAttributes: Seq[Attribute],
boundCondition: Option[GpuExpression],
numFirstConditionTableColumns: Int,
targetSize: Long,
buildSide: GpuBuildSide,
compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs
opTime: GpuMetric,
joinTime: GpuMetric) extends Iterator[ColumnarBatch] with TaskAutoCloseableResource {

private val compiledCondition: Option[CompiledExpression] = boundCondition.map { gpuExpr =>
use(opTime.ns(gpuExpr.convertToAst(numFirstConditionTableColumns).compile()))
}

private val streamJoinIter = new HashFullJoinStreamSideIterator(built, boundBuiltKeys,
buildSideTrackerInit, stream, boundStreamKeys, streamAttributes, compiledCondition, targetSize,
buildSide, compareNullsEqual, opTime, joinTime)

private var finalBatch: Option[ColumnarBatch] = None

override def hasNext: Boolean = {
if (streamJoinIter.hasNext || finalBatch.isDefined) {
true
} else {
finalBatch = getFinalBatch()
// Now that we've manifested the final batch, we can close the stream iterator early to free
// GPU resources.
streamJoinIter.close()
finalBatch.isDefined
}
}

override def next(): ColumnarBatch = {
if (!hasNext) {
throw new NoSuchElementException("batches exhausted")
}
if (streamJoinIter.hasNext) {
streamJoinIter.next()
} else {
val batch = finalBatch.get
finalBatch = None
batch
}
}

override def close(): Unit = {
if (!closed) {
super.close()
streamJoinIter.close()
finalBatch.foreach(_.close())
finalBatch = None
}
}

private def getFinalBatch(): Option[ColumnarBatch] = {
withResource(new NvtxWithMetrics("get final batch", NvtxColor.ORANGE, joinTime)) { _ =>
streamJoinIter.releaseBuiltSideTracker() match {
case None => None
case Some(tracker) =>
val filteredBatch = withResource(tracker) { scb =>
withResource(scb.getColumnarBatch()) { trackerBatch =>
withResource(GpuColumnVector.from(trackerBatch)) { trackerTab =>
val batch = built.getBatch
withResource(GpuColumnVector.from(batch)) { builtTable =>
withResource(builtTable.filter(trackerTab.getColumn(0))) { filterTab =>
GpuColumnVector.from(filterTab, GpuColumnVector.extractTypes(batch))
}
}
}
}
}
// Combine build-side columns with null columns for stream side
withResource(filteredBatch) { builtBatch =>
val numFilterRows = builtBatch.numRows()
if (numFilterRows > 0) {
val streamColumns = streamAttributes.safeMap { attr =>
GpuColumnVector.fromNull(numFilterRows, attr.dataType)
}
withResource(new ColumnarBatch(streamColumns.toArray, numFilterRows)) { streamBatch =>
buildSide match {
case GpuBuildRight =>
Some(GpuColumnVector.combineColumns(streamBatch, builtBatch))
case GpuBuildLeft =>
Some(GpuColumnVector.combineColumns(builtBatch, streamBatch))
}
}
} else {
None
}
}
}
}
}
}

class HashedExistenceJoinIterator(
spillableBuiltBatch: LazySpillableColumnarBatch,
boundBuildKeys: Seq[GpuExpression],
Expand Down Expand Up @@ -1019,7 +1121,7 @@ trait GpuHashJoin extends GpuExec {
opTime,
joinTime)
case FullOuter =>
new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream,
new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, None, lazyStream,
boundStreamKeys, streamedPlan.output, boundCondition, numFirstConditionTableColumns,
targetSize, buildSide, compareNullsEqual, opTime, joinTime)
case _ =>
Expand Down

0 comments on commit 35f64fc

Please sign in to comment.