Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix degenerate conditional nested loop join detection [databricks] #11268

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ai.rapids.cudf.{ast, GatherMap, NvtxColor, OutOfBoundsPolicy, Scalar, Tab
import ai.rapids.cudf.ast.CompiledExpression
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.shims.{GpuBroadcastJoinMeta, ShimBinaryExecNode}

Expand All @@ -35,7 +36,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.{BooleanType, DataType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

abstract class GpuBroadcastNestedLoopJoinMetaBase(
Expand Down Expand Up @@ -562,7 +563,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
condition.forall {
case GpuLiteral(true, BooleanType) =>
// Spark can generate a degenerate conditional join when the join keys are constants
output.isEmpty
true
case GpuAlias(e: GpuExpression, _) => isUnconditionalJoin(Some(e))
case _ => false
}
Expand All @@ -582,23 +583,12 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(

val broadcastRelation = getBroadcastRelation()

val joinCondition = boundCondition.orElse {
// For outer joins use a true condition if there are any columns in the build side
// otherwise use a cross join.
val useTrueCondition = joinType match {
case LeftOuter if gpuBuildSide == GpuBuildRight => right.output.nonEmpty
case RightOuter if gpuBuildSide == GpuBuildLeft => left.output.nonEmpty
case _ => false
}
if (useTrueCondition) Some(GpuLiteral(true)) else None
}

// Sometimes Spark specifies a true condition for a row-count-only join.
// This can happen when the join keys are detected to be constant.
if (isUnconditionalJoin(joinCondition)) {
if (isUnconditionalJoin(boundCondition)) {
doUnconditionalJoin(broadcastRelation)
} else {
doConditionalJoin(broadcastRelation, joinCondition, numFirstTableColumns)
doConditionalJoin(broadcastRelation, boundCondition, numFirstTableColumns)
}
}

Expand Down Expand Up @@ -627,6 +617,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val buildTime = gpuLongMetric(BUILD_TIME)
val opTime = gpuLongMetric(OP_TIME)
val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE)
val localJoinType = joinType
// NOTE: this is a def because we want a brand new `ColumnarBatch` to be returned
// per partition (task), since each task is going to be taking ownership
// of a columnar batch via `LazySpillableColumnarBatch`.
Expand Down Expand Up @@ -664,13 +655,23 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val spillableBuiltBatch = withResource(builtBatch) {
LazySpillableColumnarBatch(_, "built_batch")
}
new CrossJoinIterator(
spillableBuiltBatch,
lazyStream,
targetSizeBytes,
buildSide,
opTime = opTime,
joinTime = joinTime)

localJoinType match {
case LeftOuter if spillableBuiltBatch.numRows == 0 =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to worry about a full outer join?

Copy link
Contributor Author

@jlowe jlowe Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future yes, but we do not support FullOuter joins for broadcasted nested loop joins. Support for that is tracked by #3269.

new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
true)
case RightOuter if spillableBuiltBatch.numRows == 0 =>
new EmptyOuterNestedLoopJoinIterator(streamedIter, spillableBuiltBatch.dataTypes,
false)
case _ =>
new CrossJoinIterator(
spillableBuiltBatch,
lazyStream,
targetSizeBytes,
buildSide,
opTime = opTime,
joinTime = joinTime)
}
}
}
joinIterator.map { cb =>
Expand Down Expand Up @@ -825,3 +826,30 @@ class ConditionalNestedLoopExistenceJoinIterator(
}
}
}

/** Iterator for producing batches from an outer join where the build-side table is empty. */
class EmptyOuterNestedLoopJoinIterator(
streamIter: Iterator[ColumnarBatch],
buildTypes: Array[DataType],
isStreamFirst: Boolean) extends Iterator[ColumnarBatch] {
override def hasNext: Boolean = streamIter.hasNext

override def next(): ColumnarBatch = {
withResource(streamIter.next()) { streamBatch =>
withResource(buildNullBatch(streamBatch.numRows())) { nullBatch =>
if (isStreamFirst) {
GpuColumnVector.combineColumns(streamBatch, nullBatch)
} else {
GpuColumnVector.combineColumns(nullBatch, streamBatch)
}
}
}
}

private def buildNullBatch(numRows: Int): ColumnarBatch = {
val cols: Array[ColumnVector] = buildTypes.safeMap { dt =>
GpuColumnVector.fromNull(numRows, dt)
}
new ColumnarBatch(cols, numRows)
}
}