diff --git a/integration_tests/src/main/python/tpcds_test.py b/integration_tests/src/main/python/tpcds_test.py index cc91b42bf5db..5b8780af375e 100644 --- a/integration_tests/src/main/python/tpcds_test.py +++ b/integration_tests/src/main/python/tpcds_test.py @@ -23,8 +23,8 @@ 'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b', 'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49', 'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59', - 'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69', - 'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79', + 'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69', + 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79', 'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89', 'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99', 'ss_max', 'ss_maxb'] @@ -35,5 +35,17 @@ @allow_non_gpu(any=True) @pytest.mark.parametrize('query', queries) def test_tpcds(tpcds, query): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : tpcds.do_test_query(query), + conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'}) + +no_var_agg_queries = ['q67', 'q70'] + +@incompat +@ignore_order +@approximate_float +@allow_non_gpu(any=True) +@pytest.mark.parametrize('query', no_var_agg_queries) +def test_tpcds_no_var_agg(tpcds, query): assert_gpu_and_cpu_are_equal_collect( lambda spark : tpcds.do_test_query(query)) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index 610b408d418a..2293d9471a3e 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -137,10 +137,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala index b3eb7a39fa43..3f49cb465878 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -39,6 +40,11 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } } trait GpuHashJoin extends GpuExec with HashJoin { @@ -110,6 +116,67 @@ trait GpuHashJoin extends GpuExec with HashJoin { output.indices.map (v => v + joinLength) } + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + /** + * Filter the builtBatch if needed. builtBatch will be closed. + * @param builtBatch + * @return + */ + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = + if (shouldFilterStreamTableForNulls) { + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + } else { + streamedBatch + } + def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -172,16 +239,14 @@ trait GpuHashJoin extends GpuExec with HashJoin { joinTime: SQLMetric, filterTime: SQLMetric): Option[ColumnarBatch] = { - val streamedTable = try { - val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys) - try { - val combined = combine(streamedKeysBatch, streamedBatch) - GpuColumnVector.from(combined) - } finally { - streamedKeysBatch.close() + val combined = withResource(streamedBatch) { streamedBatch => + withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { + streamedKeysBatch => + GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) } - } finally { - streamedBatch.close() + } + val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => + GpuColumnVector.from(cb) } val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index a01125e6f5e7..d208d715d58a 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -129,20 +129,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 + val startTime = System.nanoTime() - val buildBatch = - ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) - val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) - val builtTable = try { - // Combine does not inc any reference counting - val combined = combine(keys, buildBatch) - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } finally { - keys.close() - buildBatch.close() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + val filtered = filterBuiltTableIfNeeded(combined) + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } } val delta = System.nanoTime() - startTime diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala index a004e0fccb6e..a8307fe789c2 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala @@ -138,10 +138,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala index b80db78fb288..bebae72e3400 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.HashJoin import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -40,6 +41,11 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } } trait GpuHashJoin extends GpuExec with HashJoin { @@ -111,6 +117,67 @@ trait GpuHashJoin extends GpuExec with HashJoin { output.indices.map (v => v + joinLength) } + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + /** + * Filter the builtBatch if needed. builtBatch will be closed. + * @param builtBatch + * @return + */ + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = + if (shouldFilterStreamTableForNulls) { + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + } else { + streamedBatch + } + def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -173,16 +240,14 @@ trait GpuHashJoin extends GpuExec with HashJoin { joinTime: SQLMetric, filterTime: SQLMetric): Option[ColumnarBatch] = { - val streamedTable = try { - val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys) - try { - val combined = combine(streamedKeysBatch, streamedBatch) - GpuColumnVector.from(combined) - } finally { - streamedKeysBatch.close() + val combined = withResource(streamedBatch) { streamedBatch => + withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { + streamedKeysBatch => + GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) } - } finally { - streamedBatch.close() + } + val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => + GpuColumnVector.from(cb) } val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala index ad481219fc64..2aa4f83ad131 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 + val startTime = System.nanoTime() - val buildBatch = - ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) - val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) - val builtTable = try { - // Combine does not inc any reference counting - val combined = combine(keys, buildBatch) - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } finally { - keys.close() - buildBatch.close() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + val filtered = filterBuiltTableIfNeeded(combined) + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } } val delta = System.nanoTime() - startTime diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index ef46a37e8329..45d5c6e4303c 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -143,10 +143,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala index 27d10c5c0b3e..133267e7f9a6 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.HashJoinWithoutCodegen import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -40,6 +41,11 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } } trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { @@ -111,6 +117,67 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { output.indices.map (v => v + joinLength) } + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + /** + * Filter the builtBatch if needed. builtBatch will be closed. + * @param builtBatch + * @return + */ + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = + if (shouldFilterStreamTableForNulls) { + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + } else { + streamedBatch + } + def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -173,16 +240,14 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { joinTime: SQLMetric, filterTime: SQLMetric): Option[ColumnarBatch] = { - val streamedTable = try { - val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys) - try { - val combined = combine(streamedKeysBatch, streamedBatch) - GpuColumnVector.from(combined) - } finally { - streamedKeysBatch.close() + val combined = withResource(streamedBatch) { streamedBatch => + withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { + streamedKeysBatch => + GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) } - } finally { - streamedBatch.close() + } + val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => + GpuColumnVector.from(cb) } val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala index 1d16e38d0733..0fe80ab402a5 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala @@ -130,20 +130,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 + val startTime = System.nanoTime() - val buildBatch = - ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) - val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) - val builtTable = try { - // Combine does not inc any reference counting - val combined = combine(keys, buildBatch) - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) - } finally { - keys.close() - buildBatch.close() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + val filtered = filterBuiltTableIfNeeded(combined) + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } } val delta = System.nanoTime() - startTime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index ae4c9f466147..a783e9059391 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -94,32 +94,34 @@ case class GpuProjectExec(projectList: Seq[Expression], child: SparkPlan) /** * Run a filter on a batch. The batch will be consumed. */ -object GpuFilter { +object GpuFilter extends Arm { def apply( batch: ColumnarBatch, boundCondition: Expression, numOutputRows: SQLMetric, numOutputBatches: SQLMetric, filterTime: SQLMetric): ColumnarBatch = { - val nvtxRange = new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime) - try { - var filterConditionCv: GpuColumnVector = null - var tbl: cudf.Table = null - var filtered: cudf.Table = null - val filteredBatch = try { - filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector] - tbl = GpuColumnVector.from(batch) - filtered = tbl.filter(filterConditionCv.getBase) - GpuColumnVector.from(filtered) - } finally { - Seq(filtered, tbl, filterConditionCv, batch).safeClose() - } - + withResource(new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime)) { _ => + val filteredBatch = GpuFilter(batch, boundCondition) numOutputBatches += 1 numOutputRows += filteredBatch.numRows() filteredBatch + } + } + + def apply( + batch: ColumnarBatch, + boundCondition: Expression) : ColumnarBatch = { + var filterConditionCv: GpuColumnVector = null + var tbl: cudf.Table = null + var filtered: cudf.Table = null + try { + filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector] + tbl = GpuColumnVector.from(batch) + filtered = tbl.filter(filterConditionCv.getBase) + GpuColumnVector.from(filtered) } finally { - nvtxRange.close() + Seq(filtered, tbl, filterConditionCv, batch).safeClose() } } }