Skip to content

Commit

Permalink
Add empty build batch support to unconditional nested loop join
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe committed Jul 29, 2024
1 parent 330b8b6 commit 03d3f4a
Showing 1 changed file with 49 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ai.rapids.cudf.{ast, GatherMap, NvtxColor, OutOfBoundsPolicy, Scalar, Tab
import ai.rapids.cudf.ast.CompiledExpression
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.shims.{GpuBroadcastJoinMeta, ShimBinaryExecNode}

Expand All @@ -35,7 +36,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.{BooleanType, DataType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

abstract class GpuBroadcastNestedLoopJoinMetaBase(
Expand Down Expand Up @@ -582,23 +583,12 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(

val broadcastRelation = getBroadcastRelation()

val joinCondition = boundCondition.orElse {
// For outer joins use a true condition if there are any columns in the build side
// otherwise use a cross join.
val useTrueCondition = joinType match {
case LeftOuter if gpuBuildSide == GpuBuildRight => right.output.nonEmpty
case RightOuter if gpuBuildSide == GpuBuildLeft => left.output.nonEmpty
case _ => false
}
if (useTrueCondition) Some(GpuLiteral(true)) else None
}

// Sometimes Spark specifies a true condition for a row-count-only join.
// This can happen when the join keys are detected to be constant.
if (isUnconditionalJoin(joinCondition)) {
if (isUnconditionalJoin(boundCondition)) {
doUnconditionalJoin(broadcastRelation)
} else {
doConditionalJoin(broadcastRelation, joinCondition, numFirstTableColumns)
doConditionalJoin(broadcastRelation, boundCondition, numFirstTableColumns)
}
}

Expand Down Expand Up @@ -627,6 +617,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val buildTime = gpuLongMetric(BUILD_TIME)
val opTime = gpuLongMetric(OP_TIME)
val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE)
val localJoinType = joinType
// NOTE: this is a def because we want a brand new `ColumnarBatch` to be returned
// per partition (task), since each task is going to be taking ownership
// of a columnar batch via `LazySpillableColumnarBatch`.
Expand Down Expand Up @@ -664,13 +655,23 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val spillableBuiltBatch = withResource(builtBatch) {
LazySpillableColumnarBatch(_, "built_batch")
}
new CrossJoinIterator(
spillableBuiltBatch,
lazyStream,
targetSizeBytes,
buildSide,
opTime = opTime,
joinTime = joinTime)

localJoinType match {
case LeftOuter if spillableBuiltBatch.numRows == 0 =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
true)
case RightOuter if spillableBuiltBatch.numRows == 0 =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
false)
case _ =>
new CrossJoinIterator(
spillableBuiltBatch,
lazyStream,
targetSizeBytes,
buildSide,
opTime = opTime,
joinTime = joinTime)
}
}
}
joinIterator.map { cb =>
Expand Down Expand Up @@ -825,3 +826,30 @@ class ConditionalNestedLoopExistenceJoinIterator(
}
}
}

/** Iterator for producing batches from an outer join where the build-side table is empty. */
class EmptyOuterNestedLoopJoinIterator(
streamIter: Iterator[ColumnarBatch],
buildTypes: Array[DataType],
isStreamFirst: Boolean) extends Iterator[ColumnarBatch] {
override def hasNext: Boolean = streamIter.hasNext

override def next(): ColumnarBatch = {
withResource(streamIter.next()) { streamBatch =>
withResource(buildNullBatch(streamBatch.numRows())) { nullBatch =>
if (isStreamFirst) {
GpuColumnVector.combineColumns(streamBatch, nullBatch)
} else {
GpuColumnVector.combineColumns(nullBatch, streamBatch)
}
}
}
}

private def buildNullBatch(numRows: Int): ColumnarBatch = {
val cols: Array[ColumnVector] = buildTypes.safeMap { dt =>
GpuColumnVector.fromNull(numRows, dt)
}
new ColumnarBatch(cols, numRows)
}
}

0 comments on commit 03d3f4a

Please sign in to comment.