diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index dd7a60893da..12fd1e7e75c 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -54,6 +54,9 @@ double_gen = [pytest.param(DoubleGen(), marks=[incompat])] +# data types supported by AST expressions +ast_gen = [boolean_gen, byte_gen, short_gen, int_gen, long_gen, timestamp_gen] + _sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1', 'spark.sql.join.preferSortMergeJoin': 'True', 'spark.sql.shuffle.partitions': '2', @@ -345,7 +348,7 @@ def do_join(spark): @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_broadcast_nested_loop_join_with_conditionals(data_gen, join_type, batch_size): +def test_broadcast_nested_loop_innerlike_join_with_conditionals(data_gen, join_type, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) # This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294 @@ -358,6 +361,58 @@ def do_join(spark): conf.update(allow_negative_scale_of_decimal_conf) assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn) +@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +def test_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + # This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294 + # if the sizes are large enough to have both 0.0 and -0.0 show up 500 and 250 + # but these take a long time to verify so we run with smaller numbers by default + # that do not expose the error + return left.join(broadcast(right), + (left.b >= right.r_b), join_type) + conf = {'spark.rapids.sql.batchSizeBytes': batch_size} + conf.update(allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + +@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_nested_loop_join_condition_missing(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + # This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294 + # if the sizes are large enough to have both 0.0 and -0.0 show up 500 and 250 + # but these take a long time to verify so we run with smaller numbers by default + # that do not expose the error + return left.join(broadcast(right), how=join_type) + conf = allow_negative_scale_of_decimal_conf + assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + +@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_nested_loop_join_condition_missing_count(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + return left.join(broadcast(right), how=join_type).selectExpr('COUNT(*)') + conf = allow_negative_scale_of_decimal_conf + assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + +@allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'GreaterThanOrEqual') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_nested_loop_join_with_conditionals_build_left_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + return broadcast(left).join(right, (left.b >= right.r_b), join_type) + conf = allow_negative_scale_of_decimal_conf + assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec', conf=conf) + # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala index d12d43616cc..b55bed09378 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala @@ -21,7 +21,8 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange} -import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuMetric, GpuSemaphore, LazySpillableColumnarBatch, MetricsLevel} +import ai.rapids.cudf.ast +import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, LazySpillableColumnarBatch, MetricsLevel} import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -29,6 +30,7 @@ import org.apache.spark.{Dependency, NarrowDependency, Partition, SparkContext, import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.Cross import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils, SparkPlan} import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase import org.apache.spark.sql.types.DataType @@ -111,7 +113,8 @@ class GpuCartesianPartition( class GpuCartesianRDD( sc: SparkContext, - boundCondition: Option[Expression], + boundCondition: Option[GpuExpression], + numFirstTableColumns: Int, spillCallback: SpillCallback, targetSize: Long, joinTime: GpuMetric, @@ -182,9 +185,9 @@ class GpuCartesianRDD( spillBatchBuffer.toIterator.map(LazySpillableColumnarBatch.spillOnly) } - GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin( - batch, streamIterator, targetSize, GpuBuildLeft, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, + GpuBroadcastNestedLoopJoinExecBase.nestedLoopJoin( + Cross, numFirstTableColumns, batch, streamIterator, targetSize, GpuBuildLeft, + boundCondition, spillCallback, numOutputRows, joinOutputRows, numOutputBatches, joinTime, totalTime) } } @@ -265,9 +268,11 @@ case class GpuCartesianProductExec( numOutputBatches) } else { val spillCallback = GpuMetric.makeSpillCallback(allMetrics) + val numFirstTableColumns = left.output.size new GpuCartesianRDD(sparkContext, boundCondition, + numFirstTableColumns, spillCallback, targetSizeBytes, joinTime, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index 54a9849c8d2..4b276ae22c1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.NvtxColor +import ai.rapids.cudf.ast import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.{Cross, ExistenceJoin, FullOuter, Inner, InnerLike, JoinType, LeftExistence, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{Cross, ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, IdentityBroadcastMode, UnspecifiedDistribution} import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec @@ -39,17 +41,25 @@ class GpuBroadcastNestedLoopJoinMeta( rule: DataFromReplacementRule) extends GpuBroadcastJoinMeta[BroadcastNestedLoopJoinExec](join, conf, parent, rule) { - val condition: Option[BaseExprMeta[_]] = + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - override val childExprs: Seq[BaseExprMeta[_]] = condition.toSeq + override val childExprs: Seq[BaseExprMeta[_]] = conditionMeta.toSeq override def tagPlanForGpu(): Unit = { JoinTypeChecks.tagForGpu(join.joinType, this) - + conditionMeta.foreach(_.tagForAst()) join.joinType match { - case Inner => - case Cross => + case _: InnerLike => + case LeftSemi | LeftAnti => + if (conditionMeta.exists(!_.canThisBeAst)) { + val astInfo = conditionMeta.get.explainAst(false) + willNotWorkOnGpu(s"AST cannot support join condition:\n$astInfo") + } + val gpuBuildSide = ShimLoader.getSparkShims.getBuildSide(join) + if (gpuBuildSide == GpuBuildLeft) { + willNotWorkOnGpu(s"build left not supported for ${join.joinType}") + } case _ => willNotWorkOnGpu(s"${join.joinType} currently is not supported") } @@ -79,32 +89,46 @@ class GpuBroadcastNestedLoopJoinMeta( case GpuBuildRight => right } verifyBuildSideWasReplaced(buildSide) + + val condition = conditionMeta.map(_.convertToGpu()) + // Do not yet support AST conditions on anything but semi/anti joins + val isAstCondition = join.joinType match { + case _: InnerLike => false + case LeftSemi | LeftAnti => + conditionMeta.foreach(_.tagForAst()) + val isAst = conditionMeta.forall(_.canThisBeAst) + assert(isAst, s"Non-AST condition in ${join.joinType}") + isAst + case _ => throw new IllegalStateException(s"${join.joinType} nested loop join not supported") + } + val joinExec = ShimLoader.getSparkShims.getGpuBroadcastNestedLoopJoinShim( left, right, join, join.joinType, - None, + if (isAstCondition) condition else None, conf.gpuTargetBatchSizeBytes) - // The GPU does not yet support conditional joins, so conditions are implemented - // as a filter after the join when possible. - condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec) + if (isAstCondition) { + joinExec + } else { + // condition cannot be implemented via AST so fallback to a post-filter if necessary + condition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec) + } } } -/** - * An iterator that does a cross join against a stream of batches. - */ -class CrossJoinIterator( +/** Base class for the join iterators based on a nested loop join */ +abstract class NestedLoopJoinIterator( + joinType: JoinType, builtBatch: LazySpillableColumnarBatch, private val stream: Iterator[LazySpillableColumnarBatch], - val targetSize: Long, - val buildSide: GpuBuildSide, + targetSize: Long, private val joinTime: GpuMetric, private val totalTime: GpuMetric) extends Iterator[ColumnarBatch] with Arm { private var nextCb: Option[ColumnarBatch] = None private var gathererStore: Option[JoinGatherer] = None - private var closed = false + private val nvtxName = s"$joinType gather" def close(): Unit = { if (!closed) { @@ -121,7 +145,7 @@ class CrossJoinIterator( TaskContext.get().addTaskCompletionListener[Unit](_ => close()) private def nextCbFromGatherer(): Option[ColumnarBatch] = { - withResource(new NvtxWithMetrics("cross join gather", NvtxColor.DARK_GREEN, joinTime)) { _ => + withResource(new NvtxWithMetrics(nvtxName, NvtxColor.DARK_GREEN, joinTime)) { _ => val ret = gathererStore.map { gather => val nextRows = JoinGatherer.getRowsInNextBatch(gather, targetSize) gather.gatherNext(nextRows) @@ -140,35 +164,7 @@ class CrossJoinIterator( } } - private def makeGatherer(streamBatch: LazySpillableColumnarBatch): Option[JoinGatherer] = { - // Don't close the built side because it will be used for each stream and closed - // when the iterator is done. - val (leftBatch, rightBatch) = buildSide match { - case GpuBuildLeft => (LazySpillableColumnarBatch.spillOnly(builtBatch), streamBatch) - case GpuBuildRight => (streamBatch, LazySpillableColumnarBatch.spillOnly(builtBatch)) - } - - val leftMap = LazySpillableGatherMap.leftCross(leftBatch.numRows, rightBatch.numRows) - val rightMap = LazySpillableGatherMap.rightCross(leftBatch.numRows, rightBatch.numRows) - - val joinGatherer = (leftBatch.numCols, rightBatch.numCols) match { - case (_, 0) => - rightBatch.close() - rightMap.close() - JoinGatherer(leftMap, leftBatch) - case (0, _) => - leftBatch.close() - leftMap.close() - JoinGatherer(rightMap, rightBatch) - case (_, _) => JoinGatherer(leftMap, leftBatch, rightMap, rightBatch) - } - if (joinGatherer.isDone) { - joinGatherer.close() - None - } else { - Some(joinGatherer) - } - } + protected def makeGatherer(streamBatch: LazySpillableColumnarBatch): Option[JoinGatherer] override def hasNext: Boolean = { if (closed) { @@ -207,29 +203,147 @@ class CrossJoinIterator( } } +/** + * An iterator that does a cross join against a stream of batches. + */ +class CrossJoinIterator( + builtBatch: LazySpillableColumnarBatch, + stream: Iterator[LazySpillableColumnarBatch], + targetSize: Long, + buildSide: GpuBuildSide, + private val joinTime: GpuMetric, + private val totalTime: GpuMetric) + extends NestedLoopJoinIterator( + Cross, + builtBatch, + stream, + targetSize, + joinTime, + totalTime) { + + override def makeGatherer(streamBatch: LazySpillableColumnarBatch): Option[JoinGatherer] = { + // Don't close the built side because it will be used for each stream and closed + // when the iterator is done. + val (leftBatch, rightBatch) = buildSide match { + case GpuBuildLeft => (LazySpillableColumnarBatch.spillOnly(builtBatch), streamBatch) + case GpuBuildRight => (streamBatch, LazySpillableColumnarBatch.spillOnly(builtBatch)) + } + + val leftMap = LazySpillableGatherMap.leftCross(leftBatch.numRows, rightBatch.numRows) + val rightMap = LazySpillableGatherMap.rightCross(leftBatch.numRows, rightBatch.numRows) + + val joinGatherer = (leftBatch.numCols, rightBatch.numCols) match { + case (_, 0) => + rightBatch.close() + rightMap.close() + JoinGatherer(leftMap, leftBatch) + case (0, _) => + leftBatch.close() + leftMap.close() + JoinGatherer(rightMap, rightBatch) + case (_, _) => JoinGatherer(leftMap, leftBatch, rightMap, rightBatch) + } + if (joinGatherer.isDone) { + joinGatherer.close() + None + } else { + Some(joinGatherer) + } + } +} + +class ConditionalSemiOrAntiJoinIterator( + joinType: JoinType, + builtBatch: LazySpillableColumnarBatch, + stream: Iterator[LazySpillableColumnarBatch], + targetSize: Long, + condition: ast.CompiledExpression, + spillCallback: SpillCallback, + joinTime: GpuMetric, + totalTime: GpuMetric) + extends NestedLoopJoinIterator( + joinType, + builtBatch, + stream, + targetSize, + joinTime, + totalTime) { + private[this] var compiledExpr: Option[ast.CompiledExpression] = Some(condition) + TaskContext.get().addTaskCompletionListener[Unit](_ => close()) + + override def makeGatherer(streamBatch: LazySpillableColumnarBatch): Option[JoinGatherer] = { + withResource(GpuColumnVector.from(streamBatch.getBatch)) { leftTable => + withResource(GpuColumnVector.from(builtBatch.getBatch)) { rightTable => + val map = joinType match { + case LeftSemi => + compiledExpr.map(leftTable.conditionalLeftSemiJoinGatherMap(rightTable, _, false)) + .getOrElse(leftTable.leftSemiJoinGatherMap(rightTable, false)) + case LeftAnti => + compiledExpr.map(leftTable.conditionalLeftAntiJoinGatherMap(rightTable, _, false)) + .getOrElse(leftTable.leftAntiJoinGatherMap(rightTable, false)) + case _ => + throw new IllegalStateException(s"Unexpected join type $joinType") + } + withResource(map) { map => + val lazyMap = LazySpillableGatherMap(map, spillCallback, "left_map") + val gatherer = JoinGatherer(lazyMap, streamBatch) + if (gatherer.isDone) { + gatherer.close() + None + } else { + Some(gatherer) + } + } + } + } + } + + override def close(): Unit = { + super.close() + compiledExpr.foreach(_.close()) + compiledExpr = None + } +} + object GpuBroadcastNestedLoopJoinExecBase extends Arm { - def innerLikeJoin( + def nestedLoopJoin( + joinType: JoinType, + numFirstTableColumns: Int, builtBatch: LazySpillableColumnarBatch, stream: Iterator[LazySpillableColumnarBatch], targetSize: Long, buildSide: GpuBuildSide, - boundCondition: Option[Expression], + boundCondition: Option[GpuExpression], + spillCallback: SpillCallback, numOutputRows: GpuMetric, joinOutputRows: GpuMetric, numOutputBatches: GpuMetric, joinTime: GpuMetric, totalTime: GpuMetric): Iterator[ColumnarBatch] = { - val joinIterator = + val joinIterator = if (boundCondition.isEmpty) { + // Semi and anti nested loop joins without a condition are degenerate joins and should have + // been handled at a higher level rather than calling this method. + assert(joinType.isInstanceOf[InnerLike], s"Unexpected unconditional join type: $joinType") new CrossJoinIterator(builtBatch, stream, targetSize, buildSide, joinTime, totalTime) - if (boundCondition.isDefined) { - throw new IllegalStateException("GPU does not support conditional joins") } else { - joinIterator.map { cb => + val compiledAst = boundCondition.get.convertToAst(numFirstTableColumns) match { + case e: ast.Expression => e.compile() + case e => new ast.UnaryExpression(ast.UnaryOperator.IDENTITY, e).compile() + } + joinType match { + case LeftAnti | LeftSemi => + assert(buildSide == GpuBuildRight) + new ConditionalSemiOrAntiJoinIterator(joinType, builtBatch, stream, targetSize, + compiledAst, spillCallback, joinTime, totalTime) + case _ => + throw new UnsupportedOperationException("not supported yet") + } + } + joinIterator.map { cb => joinOutputRows += cb.numRows() numOutputRows += cb.numRows() numOutputBatches += 1 cb - } } } @@ -254,6 +368,8 @@ object GpuBroadcastNestedLoopJoinExecBase extends Arm { } numOutputRows += ret.numRows() numOutputBatches += 1 + // grab the semaphore for downstream processing + GpuSemaphore.acquireIfNecessary(TaskContext.get()) ret }) } @@ -350,30 +466,95 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( } override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) - val totalTime = gpuLongMetric(TOTAL_TIME) - val joinTime = gpuLongMetric(JOIN_TIME) - val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - - val boundCondition = condition.map(GpuBindReferences.bindGpuReference(_, output)) - - val buildTime = gpuLongMetric(BUILD_TIME) - val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) - - joinType match { - case _: InnerLike => // The only thing we support right now - case _ => throw new IllegalArgumentException(s"$joinType + $getGpuBuildSide is not" + - " supported and should be run on the CPU") + // Determine which table will be first in the join and bind the references accordingly + // so the AST column references match the appropriate table. + val (firstTable, secondTable) = joinType match { + case RightOuter => (right, left) + case _ => (left, right) + } + val numFirstTableColumns = firstTable.output.size + val boundCondition = condition.map { + GpuBindReferences.bindGpuReference(_, firstTable.output ++ secondTable.output) } val broadcastRelation = broadcastExchange.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + if (boundCondition.isEmpty) { + doUnconditionalJoin(broadcastRelation, numFirstTableColumns) + } else { + doConditionalJoin(broadcastRelation, boundCondition, numFirstTableColumns) + } + } + + private def doUnconditionalJoin( + broadcastRelation: Broadcast[SerializeConcatHostBuffersDeserializeBatch], + numFirstTableColumns: Int): RDD[ColumnarBatch] = { if (output.isEmpty) { - assert(boundCondition.isEmpty) + doUnconditionalJoinRowCount(broadcastRelation) + } else { + val nestedLoopJoinType = joinType + val buildTime = gpuLongMetric(BUILD_TIME) + val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) + val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) + val joinTime = gpuLongMetric(JOIN_TIME) + val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) + val totalTime = gpuLongMetric(TOTAL_TIME) + lazy val builtBatch = makeBuiltBatch(broadcastRelation, buildTime, buildDataSize) + nestedLoopJoinType match { + case LeftSemi => + // just return the left table + left.executeColumnar().mapPartitions { leftIter => + leftIter.map { cb => + joinOutputRows += cb.numRows() + numOutputRows += cb.numRows() + numOutputBatches += 1 + cb + } + } + case LeftAnti => + // degenerate case, no rows are returned. + left.executeColumnar().mapPartitions { _ => + Iterator.single(new ColumnarBatch(Array(), 0)) + } + case _ => + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) + streamed.executeColumnar().mapPartitions { streamedIter => + val lazyStream = streamedIter.map { cb => + withResource(cb) { cb => + LazySpillableColumnarBatch(cb, spillCallback, "stream_batch") + } + } + GpuBroadcastNestedLoopJoinExecBase.nestedLoopJoin( + nestedLoopJoinType, numFirstTableColumns, + LazySpillableColumnarBatch(builtBatch, spillCallback, "built_batch"), + lazyStream, targetSizeBytes, getGpuBuildSide, None, + spillCallback, numOutputRows, joinOutputRows, numOutputBatches, + joinTime, totalTime) + } + } + } + } - lazy val buildCount: Int = computeBuildRowCount(broadcastRelation, buildTime, buildDataSize) + /** Special-case handling of an unconditional join that just needs to output a row count. */ + private def doUnconditionalJoinRowCount( + broadcastRelation: Broadcast[SerializeConcatHostBuffersDeserializeBatch] + ): RDD[ColumnarBatch] = { + if (joinType == LeftAnti) { + // degenerate case, no rows are returned. + left.executeColumnar().mapPartitions { _ => + Iterator.single(new ColumnarBatch(Array(), 0)) + } + } else { + lazy val buildCount = if (joinType == LeftSemi) { + // one-to-one mapping from input rows to output rows + 1 + } else { + val buildTime = gpuLongMetric(BUILD_TIME) + val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) + computeBuildRowCount(broadcastRelation, buildTime, buildDataSize) + } def getRowCountAndClose(cb: ColumnarBatch): Long = { val ret = cb.numRows() @@ -382,29 +563,44 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( ret } + val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val counts = streamed.executeColumnar().map(getRowCountAndClose) GpuBroadcastNestedLoopJoinExecBase.divideIntoBatches( counts.map(s => s * buildCount), targetSizeBytes, numOutputRows, numOutputBatches) - } else { - lazy val builtBatch: ColumnarBatch = - makeBuiltBatch(broadcastRelation, buildTime, buildDataSize) - val spillCallback = GpuMetric.makeSpillCallback(allMetrics) - streamed.executeColumnar().mapPartitions { streamedIter => - val lazyStream = streamedIter.map { cb => - withResource(cb) { cb => - LazySpillableColumnarBatch(cb, spillCallback, "stream_batch") - } + } + } + + private def doConditionalJoin( + broadcastRelation: Broadcast[SerializeConcatHostBuffersDeserializeBatch], + boundCondition: Option[GpuExpression], + numFirstTableColumns: Int): RDD[ColumnarBatch] = { + val buildTime = gpuLongMetric(BUILD_TIME) + val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) + lazy val builtBatch = makeBuiltBatch(broadcastRelation, buildTime, buildDataSize) + + val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) + val totalTime = gpuLongMetric(TOTAL_TIME) + val joinTime = gpuLongMetric(JOIN_TIME) + val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) + val nestedLoopJoinType = joinType + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) + streamed.executeColumnar().mapPartitions { streamedIter => + val lazyStream = streamedIter.map { cb => + withResource(cb) { cb => + LazySpillableColumnarBatch(cb, spillCallback, "stream_batch") } - GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin( - LazySpillableColumnarBatch(builtBatch, spillCallback, "built_batch"), - lazyStream, targetSizeBytes, getGpuBuildSide, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, - joinTime, totalTime) } + GpuBroadcastNestedLoopJoinExecBase.nestedLoopJoin( + nestedLoopJoinType, numFirstTableColumns, + LazySpillableColumnarBatch(builtBatch, spillCallback, "built_batch"), + lazyStream, targetSizeBytes, getGpuBuildSide, boundCondition, + spillCallback, numOutputRows, joinOutputRows, numOutputBatches, + joinTime, totalTime) } } } -