diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 8e238060f6c..97553438d04 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -105,8 +105,8 @@ object ConcatAndConsumeAll { object CoalesceGoal { def maxRequirement(a: CoalesceGoal, b: CoalesceGoal): CoalesceGoal = (a, b) match { - case (RequireSingleBatch, _) => a - case (_, RequireSingleBatch) => b + case (_: RequireSingleBatchLike, _) => a + case (_, _: RequireSingleBatchLike) => b case (_: BatchedByKey, _: TargetSize) => a case (_: TargetSize, _: BatchedByKey) => b case (a: BatchedByKey, b: BatchedByKey) => @@ -121,8 +121,8 @@ object CoalesceGoal { } def minProvided(a: CoalesceGoal, b:CoalesceGoal): CoalesceGoal = (a, b) match { - case (RequireSingleBatch, _) => b - case (_, RequireSingleBatch) => a + case (_: RequireSingleBatchLike, _) => b + case (_, _: RequireSingleBatchLike) => a case (_: BatchedByKey, _: TargetSize) => b case (_: TargetSize, _: BatchedByKey) => a case (a: BatchedByKey, b: BatchedByKey) => @@ -136,8 +136,8 @@ object CoalesceGoal { } def satisfies(found: CoalesceGoal, required: CoalesceGoal): Boolean = (found, required) match { - case (RequireSingleBatch, _) => true - case (_, RequireSingleBatch) => false + case (_: RequireSingleBatchLike, _) => true + case (_, _: RequireSingleBatchLike) => false case (_: BatchedByKey, _: TargetSize) => true case (_: TargetSize, _: BatchedByKey) => false case (BatchedByKey(aOrder), BatchedByKey(bOrder)) => @@ -167,12 +167,17 @@ sealed abstract class CoalesceSizeGoal extends CoalesceGoal { } /** - * A single batch is required as the input to a note in the SparkPlan. This means + * Trait used for pattern matching for single batch coalesce goals. + */ +trait RequireSingleBatchLike + +/** + * A single batch is required as the input to a node in the SparkPlan. This means * all of the data for a given task is in a single batch. This should be avoided * as much as possible because it can result in running out of memory or run into * limitations of the batch size by both Spark and cudf. */ -case object RequireSingleBatch extends CoalesceSizeGoal { +case object RequireSingleBatch extends CoalesceSizeGoal with RequireSingleBatchLike { override val targetSizeBytes: Long = Long.MaxValue @@ -180,6 +185,23 @@ case object RequireSingleBatch extends CoalesceSizeGoal { override def toString: String = "RequireSingleBatch" } +/** + * This is exactly the same as `RequireSingleBatch` except that if the + * batch would fail to coalesce because it reaches cuDF row-count limits, the + * coalesce code is free to null filter given the filter expression in `filterExpression`. + * @note This is an ugly hack because ideally these rows are never read from the input source + * given that we normally push down IsNotNull in Spark. This should be removed when + * we can handle this in a proper way, likely at the logical plan optimization level. + * More details here: https://issues.apache.org/jira/browse/SPARK-39131 + */ +case class RequireSingleBatchWithFilter(filterExpression: GpuExpression) + extends CoalesceSizeGoal with RequireSingleBatchLike { + + override val targetSizeBytes: Long = Long.MaxValue + + /** Override toString to improve readability of Spark explain output */ + override def toString: String = "RequireSingleBatchWithFilter" +} /** * Produce a stream of batches that are at most the given size in bytes. The size * is estimated in some cases so it may go over a little, but it should generally be @@ -228,6 +250,12 @@ abstract class AbstractGpuCoalesceIterator( private var batchInitialized: Boolean = false + /** + * This is defined iff `goal` is `RequireSingleBatchWithFilter` and we have + * reached the cuDF row-count limit. + */ + private var inputFilterExpression: Option[Expression] = None + /** * Return true if there is something saved on deck for later processing. */ @@ -351,7 +379,17 @@ abstract class AbstractGpuCoalesceIterator( // there is a hard limit of 2^31 rows while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) { - closeOnExcept(iter.next()) { cb => + val cbFromIter = iter.next() + + var cb = if (inputFilterExpression.isDefined) { + // If we have reached the cuDF limit once, proactively filter batches + // after that first limit is reached. + GpuFilter(cbFromIter, inputFilterExpression.get) + } else { + cbFromIter + } + + closeOnExcept(cb) { _ => val nextRows = cb.numRows() numInputBatches += 1 @@ -366,12 +404,40 @@ abstract class AbstractGpuCoalesceIterator( val wouldBeBytes = numBytes + nextBytes if (wouldBeRows > Int.MaxValue) { - if (goal == RequireSingleBatch) { - throw new IllegalStateException("A single batch is required for this operation," + + goal match { + case RequireSingleBatch => + throw new IllegalStateException("A single batch is required for this operation," + s" but cuDF only supports ${Int.MaxValue} rows. At least $wouldBeRows" + s" are in this partition. Please try increasing your partition count.") + case RequireSingleBatchWithFilter(filterExpression) => + // filter what we had already stored + val filteredDown = GpuFilter(concatAllAndPutOnGPU(), filterExpression) + closeOnExcept(filteredDown) { _ => + // filter the incoming batch as well + closeOnExcept(GpuFilter(cb, filterExpression)) { filteredCb => + cb = null // null out `cb` to prevent multiple close calls + val filteredWouldBeRows = filteredDown.numRows() + filteredCb.numRows() + if (filteredWouldBeRows > Int.MaxValue) { + throw new IllegalStateException( + "A single batch is required for this operation, but cuDF only supports " + + s"${Int.MaxValue} rows. At least $filteredWouldBeRows are in this " + + "partition, even after filtering nulls. " + + "Please try increasing your partition count.") + } + if (inputFilterExpression.isEmpty) { + inputFilterExpression = Some(filterExpression) + logWarning("Switched to null-filtering mode. This coalesce iterator " + + "succeeded to fit rows under the cuDF limit only after null filtering. " + + "Please try increasing your partition count.") + } + numRows = filteredWouldBeRows + numBytes = getBatchDataSize(filteredDown) + getBatchDataSize(filteredCb) + addBatch(filteredDown) + addBatch(filteredCb) + } + } + case _ => saveOnDeck(cb) // not a single batch requirement } - saveOnDeck(cb) } else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) { saveOnDeck(cb) } else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) { @@ -394,9 +460,13 @@ abstract class AbstractGpuCoalesceIterator( val isLastBatch = !(hasOnDeck || iter.hasNext) // enforce single batch limit when appropriate - if (goal == RequireSingleBatch && !isLastBatch) { - throw new IllegalStateException("A single batch is required for this operation." + - " Please try increasing your partition count.") + if (!isLastBatch) { + goal match { + case _: RequireSingleBatchLike => + throw new IllegalStateException("A single batch is required for this operation," + + " Please try increasing your partition count.") + case _ => + } } numOutputRows += numRows diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index dd01d0d3f58..b3497b61cca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -641,7 +641,7 @@ class RowToColumnarIterator( } // enforce RequireSingleBatch limit - if (rowIter.hasNext && localGoal == RequireSingleBatch) { + if (rowIter.hasNext && localGoal.isInstanceOf[RequireSingleBatchLike]) { throw new IllegalStateException("A single batch is required for this operation." + " Please try increasing your partition count.") } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 7d998f8d0b4..2412350faad 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -24,11 +24,13 @@ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType} +import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.rapids.GpuOr import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks} +import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.vectorized.ColumnarBatch class GpuShuffledHashJoinMeta( @@ -117,10 +119,24 @@ case class GpuShuffledHashJoinExec( "GpuShuffledHashJoin does not support the execute() code path.") } + // Goal to be used for the coalescing the build side. Note that this is internal to + // the join and not used for planning purposes. The two valid choices are `RequireSingleBatch` or + // `RequireSingleBatchWithFilter` + private lazy val buildGoal: CoalesceSizeGoal = joinType match { + case _: InnerLike | LeftSemi | LeftAnti => + val nullFilteringMask = boundBuildKeys.map { bk => + // coalesce(key1, false) or coalesce(key2, false) ... or ... coalesce(keyN, false) + // For any row with a key that is null, this filter mask will remove those rows. + GpuCoalesce(Seq(GpuCast(bk, BooleanType), GpuLiteral(false))) + }.reduce(GpuOr) + RequireSingleBatchWithFilter(nullFilteringMask) + case _ => RequireSingleBatch + } + override def childrenCoalesceGoal: Seq[CoalesceGoal] = (joinType, buildSide) match { case (FullOuter, _) => Seq(RequireSingleBatch, RequireSingleBatch) - case (_, GpuBuildLeft) => Seq(RequireSingleBatch, null) - case (_, GpuBuildRight) => Seq(null, RequireSingleBatch) + case (_, GpuBuildLeft) => Seq(buildGoal, null) + case (_, GpuBuildRight) => Seq(null, buildGoal) } override def doExecuteColumnar() : RDD[ColumnarBatch] = { @@ -149,6 +165,7 @@ case class GpuShuffledHashJoinExec( (streamIter, buildIter) => { val (builtBatch, maybeBufferedStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + buildGoal, batchSizeBytes, localBuildOutput, buildIter, @@ -219,6 +236,7 @@ object GpuShuffledHashJoinExec extends Arm { * because we hold onto the semaphore during the entire time after realizing the goal * has been hit. * + * @param buildGoal the build goal to use when coalescing batches * @param hostTargetBatchSize target batch size goal on the host * @param buildOutput output attributes of the build plan * @param buildIter build iterator @@ -230,6 +248,7 @@ object GpuShuffledHashJoinExec extends Arm { * used for the join */ def getBuiltBatchAndStreamIter( + buildGoal: CoalesceSizeGoal, hostTargetBatchSize: Long, buildOutput: Seq[Attribute], buildIter: Iterator[ColumnarBatch], @@ -295,7 +314,7 @@ object GpuShuffledHashJoinExec extends Arm { } } else { val buildBatch = getBuildBatchFromUnfinished( - Seq(hostConcatResult).iterator ++ hostConcatIter, + buildGoal, Seq(hostConcatResult).iterator ++ hostConcatIter, buildOutput, spillCallback, coalesceMetricsMap) buildTime += System.nanoTime() - startTime (buildBatch, streamIter) @@ -307,6 +326,7 @@ object GpuShuffledHashJoinExec extends Arm { } private def getBuildBatchFromUnfinished( + buildGoal: CoalesceSizeGoal, iterWithPrior: Iterator[HostConcatResult], buildOutput: Seq[Attribute], spillCallback: SpillCallback, @@ -322,10 +342,9 @@ object GpuShuffledHashJoinExec extends Arm { iterWithPrior, dataTypes, coalesceMetricsMap) - val res = ConcatAndConsumeAll.getSingleBatchWithVerification( + val res = ConcatAndConsumeAll.getSingleBatchWithVerification( new GpuCoalesceIterator(shuffleCoalesce, - dataTypes, - RequireSingleBatch, + dataTypes, buildGoal, NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric, coalesceMetricsMap(GpuMetric.CONCAT_TIME), coalesceMetricsMap(GpuMetric.OP_TIME), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index df3bfc1ba86..38fd2c6cf09 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -263,7 +263,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { case (plan, null) => // No coalesce requested insertCoalesce(plan, disableUntilInput) - case (plan, goal @ RequireSingleBatch) => + case (plan, goal: RequireSingleBatchLike) => // Even if coalesce is disabled a single batch is required to make this operator work // This should not cause bugs because we require a single batch in situations where // Spark also buffers data, so any operator that needs coalesce disabled would also diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 032e544b3fd..eecad64899d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -188,7 +188,7 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], opName) { // RequireSingleBatch goal is intentionally not supported in this iterator - assert(goal != RequireSingleBatch) + assert(!goal.isInstanceOf[RequireSingleBatchLike]) var batchBuilder: GpuColumnVector.GpuColumnarBatchBuilderBase = _ var totalRows = 0 diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index ac5bf51d421..2d29c7517a0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -634,7 +634,7 @@ trait GpuHashJoin extends GpuExec { case GpuBuildLeft => GpuExec.outputBatching(right) case GpuBuildRight => GpuExec.outputBatching(left) } - if (batching == RequireSingleBatch) { + if (batching.isInstanceOf[RequireSingleBatchLike]) { RequireSingleBatch } else { null diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala index 2b72bcc272a..4d079abc079 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala @@ -603,7 +603,7 @@ private class ExternalRowToColumnarIterator( } // enforce single batch limit when appropriate - if (rowIter.hasNext && localGoal == RequireSingleBatch) { + if (rowIter.hasNext && localGoal.isInstanceOf[RequireSingleBatchLike]) { throw new IllegalStateException("A single batch is required for this operation." + " Please try increasing your partition count.") } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala index ee11387c9ac..f277921c7d3 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala @@ -39,6 +39,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { when(mockBuildIter.hasNext).thenReturn(false) val mockStreamIter = mock[Iterator[ColumnarBatch]] val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 0, Seq.empty, mockBuildIter, @@ -71,6 +72,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { when(buildIter.buffered).thenReturn(buildBufferedIter) val mockStreamIter = mock[Iterator[ColumnarBatch]] val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 0, Seq.empty, buildIter, @@ -103,6 +105,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { val buildIter = Seq(emptyBatch).iterator val mockStreamIter = mock[Iterator[ColumnarBatch]] val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 0, Seq.empty, buildIter, @@ -129,6 +132,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { val buildIter = Seq(batch).iterator val mockStreamIter = mock[Iterator[ColumnarBatch]] val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 0, Seq.empty, buildIter, @@ -182,6 +186,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { val buildIter = Seq(serializedBatch).iterator val attrs = AttributeReference("a", IntegerType, false)() :: Nil val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 1024, attrs, buildIter, @@ -215,6 +220,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { val buildIter = Seq(serializedBatch).iterator val attrs = AttributeReference("a", IntegerType, false)() :: Nil val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 1024, attrs, buildIter, @@ -252,6 +258,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { val buildIter = Seq(serializedBatch, serializedBatch2).iterator val attrs = AttributeReference("a", IntegerType, false)() :: Nil val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 1, attrs, buildIter, @@ -290,6 +297,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { val buildIter = Seq(serializedBatch, serializedBatch2).iterator val attrs = AttributeReference("a", IntegerType, false)() :: Nil val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + RequireSingleBatch, 1024, attrs, buildIter,