From df00904433514930a2dbbdd8ae3ec9043dc3dc3d Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 20 Aug 2020 15:02:27 -0500 Subject: [PATCH] Filter nulls from joins where possible to improve performance. (#594) * Filter nulls from joins where possible to improve performance. Signed-off-by: Robert (Bobby) Evans * Addressed review comments Signed-off-by: Robert (Bobby) Evans * Updated patch for other shims --- .../src/main/python/tpcds_test.py | 16 ++++- .../spark300/GpuBroadcastHashJoinExec.scala | 13 ++-- .../rapids/shims/spark300/GpuHashJoin.scala | 69 ++++++++++++++++++- .../spark300/GpuShuffledHashJoinExec.scala | 26 +++---- .../spark300db/GpuBroadcastHashJoinExec.scala | 13 ++-- .../rapids/shims/spark300db/GpuHashJoin.scala | 64 ++++++++++++++++- .../spark300db/GpuShuffledHashJoinExec.scala | 26 +++---- .../spark301/GpuBroadcastHashJoinExec.scala | 13 ++-- .../rapids/shims/spark310/GpuHashJoin.scala | 64 ++++++++++++++++- .../spark310/GpuShuffledHashJoinExec.scala | 26 +++---- .../spark/rapids/basicPhysicalOperators.scala | 34 ++++----- 11 files changed, 292 insertions(+), 72 deletions(-) diff --git a/integration_tests/src/main/python/tpcds_test.py b/integration_tests/src/main/python/tpcds_test.py index cc91b42bf5d..5b8780af375 100644 --- a/integration_tests/src/main/python/tpcds_test.py +++ b/integration_tests/src/main/python/tpcds_test.py @@ -23,8 +23,8 @@ 'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b', 'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49', 'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59', - 'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69', - 'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79', + 'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69', + 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79', 'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89', 'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99', 'ss_max', 'ss_maxb'] @@ -35,5 +35,17 @@ @allow_non_gpu(any=True) @pytest.mark.parametrize('query', queries) def test_tpcds(tpcds, query): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : tpcds.do_test_query(query), + conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'}) + +no_var_agg_queries = ['q67', 'q70'] + +@incompat +@ignore_order +@approximate_float +@allow_non_gpu(any=True) +@pytest.mark.parametrize('query', no_var_agg_queries) +def test_tpcds_no_var_agg(tpcds, query): assert_gpu_and_cpu_are_equal_collect( lambda spark : tpcds.do_test_query(query)) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index 610b408d418..2293d9471a3 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -137,10 +137,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala index b3eb7a39fa4..1b2686b6334 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -39,6 +40,11 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } } trait GpuHashJoin extends GpuExec with HashJoin { @@ -110,6 +116,63 @@ trait GpuHashJoin extends GpuExec with HashJoin { output.indices.map (v => v + joinLength) } + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + /** + * Filter the builtBatch if needed. builtBatch will be closed. + * @param builtBatch + * @return + */ + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch = + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -134,7 +197,11 @@ trait GpuHashJoin extends GpuExec with HashJoin { override def hasNext: Boolean = { while (nextCb.isEmpty && (first || stream.hasNext)) { if (stream.hasNext) { - val cb = stream.next() + val cb = if (shouldFilterStreamTableForNulls) { + filterStreamedTable(stream.next()) + } else { + stream.next() + } val startTime = System.nanoTime() nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, numOutputBatches, joinTime, filterTime) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index 08875868787..6997608f646 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -117,20 +117,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 + val startTime = System.nanoTime() - val buildBatch = - ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) - val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) - val builtTable = try { - // Combine does not inc any reference counting - val combined = combine(keys, buildBatch) - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } finally { - keys.close() - buildBatch.close() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + val filtered = filterBuiltTableIfNeeded(combined) + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } } val delta = System.nanoTime() - startTime diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala index a004e0fccb6..a8307fe789c 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala @@ -138,10 +138,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala index b80db78fb28..2e59d034e1a 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.HashJoin import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -40,6 +41,11 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } } trait GpuHashJoin extends GpuExec with HashJoin { @@ -111,6 +117,58 @@ trait GpuHashJoin extends GpuExec with HashJoin { output.indices.map (v => v + joinLength) } + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch = + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -135,7 +193,11 @@ trait GpuHashJoin extends GpuExec with HashJoin { override def hasNext: Boolean = { while (nextCb.isEmpty && (first || stream.hasNext)) { if (stream.hasNext) { - val cb = stream.next() + val cb = if (shouldFilterStreamTableForNulls) { + filterStreamedTable(stream.next()) + } else { + stream.next() + } val startTime = System.nanoTime() nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, numOutputBatches, joinTime, filterTime) diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala index ad481219fc6..2aa4f83ad13 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 + val startTime = System.nanoTime() - val buildBatch = - ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) - val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) - val builtTable = try { - // Combine does not inc any reference counting - val combined = combine(keys, buildBatch) - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } finally { - keys.close() - buildBatch.close() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + val filtered = filterBuiltTableIfNeeded(combined) + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } } val delta = System.nanoTime() - startTime diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index ef46a37e832..45d5c6e4303 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -143,10 +143,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala index 27d10c5c0b3..4ed12d95727 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.HashJoinWithoutCodegen import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -40,6 +41,11 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } } trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { @@ -111,6 +117,58 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { output.indices.map (v => v + joinLength) } + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch = + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -135,7 +193,11 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { override def hasNext: Boolean = { while (nextCb.isEmpty && (first || stream.hasNext)) { if (stream.hasNext) { - val cb = stream.next() + val cb = if (shouldFilterStreamTableForNulls) { + filterStreamedTable(stream.next()) + } else { + stream.next() + } val startTime = System.nanoTime() nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, numOutputBatches, joinTime, filterTime) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala index 7e281c698b1..719639fbdfc 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 + val startTime = System.nanoTime() - val buildBatch = - ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) - val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) - val builtTable = try { - // Combine does not inc any reference counting - val combined = combine(keys, buildBatch) - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } finally { - keys.close() - buildBatch.close() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + val filtered = filterBuiltTableIfNeeded(combined) + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } } val delta = System.nanoTime() - startTime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index ae4c9f46614..a783e905939 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -94,32 +94,34 @@ case class GpuProjectExec(projectList: Seq[Expression], child: SparkPlan) /** * Run a filter on a batch. The batch will be consumed. */ -object GpuFilter { +object GpuFilter extends Arm { def apply( batch: ColumnarBatch, boundCondition: Expression, numOutputRows: SQLMetric, numOutputBatches: SQLMetric, filterTime: SQLMetric): ColumnarBatch = { - val nvtxRange = new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime) - try { - var filterConditionCv: GpuColumnVector = null - var tbl: cudf.Table = null - var filtered: cudf.Table = null - val filteredBatch = try { - filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector] - tbl = GpuColumnVector.from(batch) - filtered = tbl.filter(filterConditionCv.getBase) - GpuColumnVector.from(filtered) - } finally { - Seq(filtered, tbl, filterConditionCv, batch).safeClose() - } - + withResource(new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime)) { _ => + val filteredBatch = GpuFilter(batch, boundCondition) numOutputBatches += 1 numOutputRows += filteredBatch.numRows() filteredBatch + } + } + + def apply( + batch: ColumnarBatch, + boundCondition: Expression) : ColumnarBatch = { + var filterConditionCv: GpuColumnVector = null + var tbl: cudf.Table = null + var filtered: cudf.Table = null + try { + filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector] + tbl = GpuColumnVector.from(batch) + filtered = tbl.filter(filterConditionCv.getBase) + GpuColumnVector.from(filtered) } finally { - nvtxRange.close() + Seq(filtered, tbl, filterConditionCv, batch).safeClose() } } }