Skip to content

Commit

Permalink
Filter nulls from joins where possible to improve performance. (NVIDI…
Browse files Browse the repository at this point in the history
…A#594)

* Filter nulls from joins where possible to improve performance.

Signed-off-by: Robert (Bobby) Evans <[email protected]>

* Addressed review comments

Signed-off-by: Robert (Bobby) Evans <[email protected]>

* Updated patch for other shims
  • Loading branch information
revans2 authored Aug 20, 2020
1 parent ce1f9b8 commit df00904
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 72 deletions.
16 changes: 14 additions & 2 deletions integration_tests/src/main/python/tpcds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b',
'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49',
'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69',
'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69',
'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89',
'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99',
'ss_max', 'ss_maxb']
Expand All @@ -35,5 +35,17 @@
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', queries)
def test_tpcds(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query),
conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'})

no_var_agg_queries = ['q67', 'q70']

@incompat
@ignore_order
@approximate_float
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', no_var_agg_queries)
def test_tpcds_no_var_agg(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query))
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -39,6 +40,11 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -110,6 +116,63 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand All @@ -134,7 +197,11 @@ trait GpuHashJoin extends GpuExec with HashJoin {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = stream.next()
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
}

val delta = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.HashJoin
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -40,6 +41,11 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -111,6 +117,58 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand All @@ -135,7 +193,11 @@ trait GpuHashJoin extends GpuExec with HashJoin {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = stream.next()
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
}

val delta = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Loading

0 comments on commit df00904

Please sign in to comment.