Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter rows with null keys when coalescing due to reaching cuDF row limits [databricks] #5531

Merged
merged 8 commits into from
May 23, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ object ConcatAndConsumeAll {

object CoalesceGoal {
def maxRequirement(a: CoalesceGoal, b: CoalesceGoal): CoalesceGoal = (a, b) match {
case (RequireSingleBatch, _) => a
case (_, RequireSingleBatch) => b
case (_: RequireSingleBatchLike, _) => a
case (_, _: RequireSingleBatchLike) => b
case (_: BatchedByKey, _: TargetSize) => a
case (_: TargetSize, _: BatchedByKey) => b
case (a: BatchedByKey, b: BatchedByKey) =>
Expand All @@ -121,8 +121,8 @@ object CoalesceGoal {
}

def minProvided(a: CoalesceGoal, b:CoalesceGoal): CoalesceGoal = (a, b) match {
case (RequireSingleBatch, _) => b
case (_, RequireSingleBatch) => a
case (_: RequireSingleBatchLike, _) => b
case (_, _: RequireSingleBatchLike) => a
case (_: BatchedByKey, _: TargetSize) => b
case (_: TargetSize, _: BatchedByKey) => a
case (a: BatchedByKey, b: BatchedByKey) =>
Expand All @@ -136,8 +136,8 @@ object CoalesceGoal {
}

def satisfies(found: CoalesceGoal, required: CoalesceGoal): Boolean = (found, required) match {
case (RequireSingleBatch, _) => true
case (_, RequireSingleBatch) => false
case (_: RequireSingleBatchLike, _) => true
case (_, _: RequireSingleBatchLike) => false
case (_: BatchedByKey, _: TargetSize) => true
case (_: TargetSize, _: BatchedByKey) => false
case (BatchedByKey(aOrder), BatchedByKey(bOrder)) =>
Expand Down Expand Up @@ -167,19 +167,41 @@ sealed abstract class CoalesceSizeGoal extends CoalesceGoal {
}

/**
* A single batch is required as the input to a note in the SparkPlan. This means
* Trait used for pattern matching for single batch coalesce goals.
*/
trait RequireSingleBatchLike

/**
* A single batch is required as the input to a node in the SparkPlan. This means
* all of the data for a given task is in a single batch. This should be avoided
* as much as possible because it can result in running out of memory or run into
* limitations of the batch size by both Spark and cudf.
*/
case object RequireSingleBatch extends CoalesceSizeGoal {
case object RequireSingleBatch extends CoalesceSizeGoal with RequireSingleBatchLike {

override val targetSizeBytes: Long = Long.MaxValue

/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatch"
}

/**
* This is exactly the same as `RequireSingleBatch` except that if the
* batch would fail to coalesce because it reaches cuDF row-count limits, the
* coalesce code is free to null filter given the filter expression in `filterExpression`.
* @note This is an ugly hack because ideally these rows are never read from the input source
* given that we normally push down IsNotNull in Spark. This should be removed when
* we can handle this in a proper way, likely at the logical plan optimization level.
* More details here: https://issues.apache.org/jira/browse/SPARK-39131
*/
case class RequireSingleBatchWithFilter(filterExpression: GpuExpression)
extends CoalesceSizeGoal with RequireSingleBatchLike {

override val targetSizeBytes: Long = Long.MaxValue

/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatchWithFilter"
}
/**
* Produce a stream of batches that are at most the given size in bytes. The size
* is estimated in some cases so it may go over a little, but it should generally be
Expand Down Expand Up @@ -228,6 +250,12 @@ abstract class AbstractGpuCoalesceIterator(

private var batchInitialized: Boolean = false

/**
* This is defined iff `goal` is `RequireSingleBatchWithFilter` and we have
* reached the cuDF row-count limit.
*/
private var inputFilterExpression: Option[Expression] = None

/**
* Return true if there is something saved on deck for later processing.
*/
Expand Down Expand Up @@ -351,7 +379,17 @@ abstract class AbstractGpuCoalesceIterator(

// there is a hard limit of 2^31 rows
while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) {
closeOnExcept(iter.next()) { cb =>
val cbFromIter = iter.next()

var cb = if (inputFilterExpression.isDefined) {
// If we have reached the cuDF limit once, proactively filter batches
// after that first limit is reached.
GpuFilter(cbFromIter, inputFilterExpression.get)
} else {
cbFromIter
}

closeOnExcept(cb) { _ =>
val nextRows = cb.numRows()
numInputBatches += 1

Expand All @@ -366,12 +404,40 @@ abstract class AbstractGpuCoalesceIterator(
val wouldBeBytes = numBytes + nextBytes

if (wouldBeRows > Int.MaxValue) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
goal match {
case RequireSingleBatch =>
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} rows. At least $wouldBeRows" +
s" are in this partition. Please try increasing your partition count.")
case RequireSingleBatchWithFilter(filterExpression) =>
// filter what we had already stored
val filteredDown = GpuFilter(concatAllAndPutOnGPU(), filterExpression)
closeOnExcept(filteredDown) { _ =>
// filter the incoming batch as well
closeOnExcept(GpuFilter(cb, filterExpression)) { filteredCb =>
cb = null // null out `cb` to prevent multiple close calls
val filteredWouldBeRows = filteredDown.numRows() + filteredCb.numRows()
if (filteredWouldBeRows > Int.MaxValue) {
throw new IllegalStateException(
"A single batch is required for this operation, but cuDF only supports " +
s"${Int.MaxValue} rows. At least $filteredWouldBeRows are in this " +
"partition, even after filtering nulls. " +
"Please try increasing your partition count.")
}
if (inputFilterExpression.isEmpty) {
inputFilterExpression = Some(filterExpression)
logWarning("Switched to null-filtering mode. This coalesce iterator " +
"succeeded to fit rows under the cuDF limit only after null filtering. " +
"Please try increasing your partition count.")
}
numRows = filteredWouldBeRows
numBytes = getBatchDataSize(filteredDown) + getBatchDataSize(filteredCb)
addBatch(filteredDown)
addBatch(filteredCb)
}
}
case _ => saveOnDeck(cb) // not a single batch requirement
}
saveOnDeck(cb)
} else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) {
saveOnDeck(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
Expand All @@ -394,9 +460,13 @@ abstract class AbstractGpuCoalesceIterator(
val isLastBatch = !(hasOnDeck || iter.hasNext)

// enforce single batch limit when appropriate
if (goal == RequireSingleBatch && !isLastBatch) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
if (!isLastBatch) {
goal match {
case _: RequireSingleBatchLike =>
throw new IllegalStateException("A single batch is required for this operation," +
" Please try increasing your partition count.")
case _ =>
}
}

numOutputRows += numRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ class RowToColumnarIterator(
}

// enforce RequireSingleBatch limit
if (rowIter.hasNext && localGoal == RequireSingleBatch) {
if (rowIter.hasNext && localGoal.isInstanceOf[RequireSingleBatchLike]) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType}
import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.rapids.GpuOr
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks}
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuShuffledHashJoinMeta(
Expand Down Expand Up @@ -117,10 +119,24 @@ case class GpuShuffledHashJoinExec(
"GpuShuffledHashJoin does not support the execute() code path.")
}

// Goal to be used for the coalescing the build side. Note that this is internal to
// the join and not used for planning purposes. The two valid choices are `RequireSingleBatch` or
// `RequireSingleBatchWithFilter`
private lazy val buildGoal: CoalesceSizeGoal = joinType match {
case _: InnerLike | LeftSemi | LeftAnti =>
val nullFilteringMask = boundBuildKeys.map { bk =>
// coalesce(key1, false) or coalesce(key2, false) ... or ... coalesce(keyN, false)
// For any row with a key that is null, this filter mask will remove those rows.
GpuCoalesce(Seq(GpuCast(bk, BooleanType), GpuLiteral(false)))
}.reduce(GpuOr)
RequireSingleBatchWithFilter(nullFilteringMask)
case _ => RequireSingleBatch
}

override def childrenCoalesceGoal: Seq[CoalesceGoal] = (joinType, buildSide) match {
case (FullOuter, _) => Seq(RequireSingleBatch, RequireSingleBatch)
case (_, GpuBuildLeft) => Seq(RequireSingleBatch, null)
case (_, GpuBuildRight) => Seq(null, RequireSingleBatch)
case (_, GpuBuildLeft) => Seq(buildGoal, null)
case (_, GpuBuildRight) => Seq(null, buildGoal)
}

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
Expand Down Expand Up @@ -149,6 +165,7 @@ case class GpuShuffledHashJoinExec(
(streamIter, buildIter) => {
val (builtBatch, maybeBufferedStreamIter) =
GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
buildGoal,
batchSizeBytes,
localBuildOutput,
buildIter,
Expand Down Expand Up @@ -219,6 +236,7 @@ object GpuShuffledHashJoinExec extends Arm {
* because we hold onto the semaphore during the entire time after realizing the goal
* has been hit.
*
* @param buildGoal the build goal to use when coalescing batches
* @param hostTargetBatchSize target batch size goal on the host
* @param buildOutput output attributes of the build plan
* @param buildIter build iterator
Expand All @@ -230,6 +248,7 @@ object GpuShuffledHashJoinExec extends Arm {
* used for the join
*/
def getBuiltBatchAndStreamIter(
buildGoal: CoalesceSizeGoal,
hostTargetBatchSize: Long,
buildOutput: Seq[Attribute],
buildIter: Iterator[ColumnarBatch],
Expand Down Expand Up @@ -295,7 +314,7 @@ object GpuShuffledHashJoinExec extends Arm {
}
} else {
val buildBatch = getBuildBatchFromUnfinished(
Seq(hostConcatResult).iterator ++ hostConcatIter,
buildGoal, Seq(hostConcatResult).iterator ++ hostConcatIter,
buildOutput, spillCallback, coalesceMetricsMap)
buildTime += System.nanoTime() - startTime
(buildBatch, streamIter)
Expand All @@ -307,6 +326,7 @@ object GpuShuffledHashJoinExec extends Arm {
}

private def getBuildBatchFromUnfinished(
buildGoal: CoalesceSizeGoal,
iterWithPrior: Iterator[HostConcatResult],
buildOutput: Seq[Attribute],
spillCallback: SpillCallback,
Expand All @@ -322,10 +342,9 @@ object GpuShuffledHashJoinExec extends Arm {
iterWithPrior,
dataTypes,
coalesceMetricsMap)
val res = ConcatAndConsumeAll.getSingleBatchWithVerification(
val res = ConcatAndConsumeAll.getSingleBatchWithVerification(
new GpuCoalesceIterator(shuffleCoalesce,
dataTypes,
RequireSingleBatch,
dataTypes, buildGoal,
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric,
coalesceMetricsMap(GpuMetric.CONCAT_TIME),
coalesceMetricsMap(GpuMetric.OP_TIME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case (plan, null) =>
// No coalesce requested
insertCoalesce(plan, disableUntilInput)
case (plan, goal @ RequireSingleBatch) =>
case (plan, goal: RequireSingleBatchLike) =>
// Even if coalesce is disabled a single batch is required to make this operator work
// This should not cause bugs because we require a single batch in situations where
// Spark also buffers data, so any operator that needs coalesce disabled would also
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
opName) {

// RequireSingleBatch goal is intentionally not supported in this iterator
assert(goal != RequireSingleBatch)
assert(!goal.isInstanceOf[RequireSingleBatchLike])

var batchBuilder: GpuColumnVector.GpuColumnarBatchBuilderBase = _
var totalRows = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ trait GpuHashJoin extends GpuExec {
case GpuBuildLeft => GpuExec.outputBatching(right)
case GpuBuildRight => GpuExec.outputBatching(left)
}
if (batching == RequireSingleBatch) {
if (batching.isInstanceOf[RequireSingleBatchLike]) {
RequireSingleBatch
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private class ExternalRowToColumnarIterator(
}

// enforce single batch limit when appropriate
if (rowIter.hasNext && localGoal == RequireSingleBatch) {
if (rowIter.hasNext && localGoal.isInstanceOf[RequireSingleBatchLike]) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
when(mockBuildIter.hasNext).thenReturn(false)
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
mockBuildIter,
Expand Down Expand Up @@ -71,6 +72,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
when(buildIter.buffered).thenReturn(buildBufferedIter)
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
buildIter,
Expand Down Expand Up @@ -103,6 +105,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(emptyBatch).iterator
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
buildIter,
Expand All @@ -129,6 +132,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(batch).iterator
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
buildIter,
Expand Down Expand Up @@ -182,6 +186,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1024,
attrs,
buildIter,
Expand Down Expand Up @@ -215,6 +220,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1024,
attrs,
buildIter,
Expand Down Expand Up @@ -252,6 +258,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch, serializedBatch2).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1,
attrs,
buildIter,
Expand Down Expand Up @@ -290,6 +297,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch, serializedBatch2).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1024,
attrs,
buildIter,
Expand Down