Skip to content

Commit

Permalink
Filter nulls from joins where possible to improve performance (NVIDIA…
Browse files Browse the repository at this point in the history
…#754)

Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored and JustPlay committed Sep 13, 2020
1 parent e2adf57 commit 274ec7c
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 96 deletions.
16 changes: 14 additions & 2 deletions integration_tests/src/main/python/tpcds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b',
'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49',
'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69',
'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69',
'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89',
'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99',
'ss_max', 'ss_maxb']
Expand All @@ -35,5 +35,17 @@
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', queries)
def test_tpcds(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query),
conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'})

no_var_agg_queries = ['q67', 'q70']

@incompat
@ignore_order
@approximate_float
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', no_var_agg_queries)
def test_tpcds_no_var_agg(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query))
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -39,6 +40,11 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -110,6 +116,67 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand Down Expand Up @@ -172,16 +239,14 @@ trait GpuHashJoin extends GpuExec with HashJoin {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
}

val delta = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.HashJoin
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -40,6 +41,11 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -111,6 +117,67 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand Down Expand Up @@ -173,16 +240,14 @@ trait GpuHashJoin extends GpuExec with HashJoin {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
}

val delta = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Loading

0 comments on commit 274ec7c

Please sign in to comment.