diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala index 7ddc969d919..578c1106eb1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala @@ -21,6 +21,7 @@ import ai.rapids.cudf.{ast, GatherMap, NvtxColor, OutOfBoundsPolicy, Scalar, Tab import ai.rapids.cudf.ast.CompiledExpression import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.shims.{GpuBroadcastJoinMeta, ShimBinaryExecNode} @@ -35,7 +36,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec -import org.apache.spark.sql.types.BooleanType +import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} abstract class GpuBroadcastNestedLoopJoinMetaBase( @@ -562,7 +563,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( condition.forall { case GpuLiteral(true, BooleanType) => // Spark can generate a degenerate conditional join when the join keys are constants - output.isEmpty + true case GpuAlias(e: GpuExpression, _) => isUnconditionalJoin(Some(e)) case _ => false } @@ -582,23 +583,12 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( val broadcastRelation = getBroadcastRelation() - val joinCondition = boundCondition.orElse { - // For outer joins use a true condition if there are any columns in the build side - // otherwise use a cross join. - val useTrueCondition = joinType match { - case LeftOuter if gpuBuildSide == GpuBuildRight => right.output.nonEmpty - case RightOuter if gpuBuildSide == GpuBuildLeft => left.output.nonEmpty - case _ => false - } - if (useTrueCondition) Some(GpuLiteral(true)) else None - } - // Sometimes Spark specifies a true condition for a row-count-only join. // This can happen when the join keys are detected to be constant. - if (isUnconditionalJoin(joinCondition)) { + if (isUnconditionalJoin(boundCondition)) { doUnconditionalJoin(broadcastRelation) } else { - doConditionalJoin(broadcastRelation, joinCondition, numFirstTableColumns) + doConditionalJoin(broadcastRelation, boundCondition, numFirstTableColumns) } } @@ -627,6 +617,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( val buildTime = gpuLongMetric(BUILD_TIME) val opTime = gpuLongMetric(OP_TIME) val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE) + val localJoinType = joinType // NOTE: this is a def because we want a brand new `ColumnarBatch` to be returned // per partition (task), since each task is going to be taking ownership // of a columnar batch via `LazySpillableColumnarBatch`. @@ -664,13 +655,23 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( val spillableBuiltBatch = withResource(builtBatch) { LazySpillableColumnarBatch(_, "built_batch") } - new CrossJoinIterator( - spillableBuiltBatch, - lazyStream, - targetSizeBytes, - buildSide, - opTime = opTime, - joinTime = joinTime) + + localJoinType match { + case LeftOuter if spillableBuiltBatch.numRows == 0 => + new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes, + true) + case RightOuter if spillableBuiltBatch.numRows == 0 => + new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes, + false) + case _ => + new CrossJoinIterator( + spillableBuiltBatch, + lazyStream, + targetSizeBytes, + buildSide, + opTime = opTime, + joinTime = joinTime) + } } } joinIterator.map { cb => @@ -825,3 +826,30 @@ class ConditionalNestedLoopExistenceJoinIterator( } } } + +/** Iterator for producing batches from an outer join where the build-side table is empty. */ +class EmptyOuterNestedLoopJoinIterator( + streamIter: Iterator[ColumnarBatch], + buildTypes: Array[DataType], + isStreamFirst: Boolean) extends Iterator[ColumnarBatch] { + override def hasNext: Boolean = streamIter.hasNext + + override def next(): ColumnarBatch = { + withResource(streamIter.next()) { streamBatch => + withResource(buildNullBatch(streamBatch.numRows())) { nullBatch => + if (isStreamFirst) { + GpuColumnVector.combineColumns(streamBatch, nullBatch) + } else { + GpuColumnVector.combineColumns(nullBatch, streamBatch) + } + } + } + } + + private def buildNullBatch(numRows: Int): ColumnarBatch = { + val cols: Array[ColumnVector] = buildTypes.safeMap { dt => + GpuColumnVector.fromNull(numRows, dt) + } + new ColumnarBatch(cols, numRows) + } +}