From 6465e518ace7062f0f099b8e03dc39157e267630 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 3 May 2021 13:25:01 -0500 Subject: [PATCH] Allow batching the output of a join (#2310) Signed-off-by: Robert (Bobby) Evans --- integration_tests/src/main/python/asserts.py | 4 +- .../src/main/python/join_test.py | 9 +- .../spark300/GpuBroadcastHashJoinExec.scala | 34 +- .../spark300/GpuShuffledHashJoinExec.scala | 2 +- .../spark301/GpuBroadcastHashJoinExec.scala | 34 +- .../spark301db/GpuBroadcastHashJoinExec.scala | 34 +- .../spark301db/GpuShuffledHashJoinExec.scala | 20 +- .../spark311/GpuBroadcastHashJoinExec.scala | 34 +- .../spark311/GpuShuffledHashJoinExec.scala | 4 +- .../nvidia/spark/rapids/GpuColumnVector.java | 5 +- .../spark/rapids/GpuBoundAttribute.scala | 10 +- .../rapids/GpuShuffledHashJoinBase.scala | 46 +- .../nvidia/spark/rapids/JoinGatherer.scala | 569 ++++++++++++++ .../com/nvidia/spark/rapids/MetaUtils.scala | 12 + .../spark/rapids/SpillableColumnarBatch.scala | 70 +- .../GpuBroadcastNestedLoopJoinExec.scala | 2 +- .../sql/rapids/execution/GpuHashJoin.scala | 701 +++++++++++------- 17 files changed, 1193 insertions(+), 397 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index 4af5cd0d2f8..c0464a476aa 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -28,7 +28,7 @@ def _assert_equal(cpu, gpu, float_check, path): t = type(cpu) if (t is Row): - assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {}".format(path) + assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu)) if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"): for field in cpu.__fields__: _assert_equal(cpu[field], gpu[field], float_check, path + [field]) @@ -36,7 +36,7 @@ def _assert_equal(cpu, gpu, float_check, path): for index in range(len(cpu)): _assert_equal(cpu[index], gpu[index], float_check, path + [index]) elif (t is list): - assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {}".format(path) + assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu)) for index in range(len(cpu)): _assert_equal(cpu[index], gpu[index], float_check, path + [index]) elif (t is pytypes.GeneratorType): diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index fcc960bc0b9..46327ba0567 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -110,7 +110,7 @@ def do_join(spark): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_sortmerge_join_array(data_gen, join_type, batch_size): def do_join(spark): left, right = create_nested_df(spark, short_gen, data_gen, 500, 500) @@ -132,11 +132,14 @@ def do_join(spark): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn) -def test_sortmerge_join_struct(data_gen, join_type): +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test out of core joins too +def test_sortmerge_join_struct(data_gen, join_type, batch_size): def do_join(spark): left, right = create_nested_df(spark, short_gen, data_gen, 500, 500) return left.join(right, left.key == right.r_key, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) + conf = {'spark.rapids.sql.batchSizeBytes': batch_size} + conf.update(_sortmerge_join_conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) # For spark to insert a shuffled hash join it has to be enabled with # "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index 7af0a3c3d34..e9930f5f48a 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -89,7 +89,7 @@ case class GpuBroadcastHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { import GpuMetric._ @@ -100,7 +100,7 @@ case class GpuBroadcastHashJoinExec( JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME), JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) + FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) @@ -141,28 +141,20 @@ case class GpuBroadcastHashJoinExec( val filterTime = gpuLongMetric(FILTER_TIME) val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - val broadcastRelation = broadcastExchange - .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) - lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - withResource(combined) { combined => - filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) - } - } - - // Don't warn for a leak, because we cannot control when we are done with this - (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) - ret - } + val broadcastRelation = broadcastExchange + .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() val rdd = streamedPlan.executeColumnar() - rdd.mapPartitions(it => - doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, - numOutputBatches, streamTime, joinTime, filterTime, totalTime)) + rdd.mapPartitions { it => + val builtBatch = broadcastRelation.value.batch + GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected()) + doJoin(builtBatch, it, targetSize, spillCallback, + numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime, + filterTime, totalTime) + } } } diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index ff87fe64af9..58700c594aa 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -75,7 +75,7 @@ case class GpuShuffledHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan, override val isSkewJoin: Boolean) diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index 881f12c8d88..c3ac7040458 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -87,7 +87,7 @@ case class GpuBroadcastHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { import GpuMetric._ @@ -98,7 +98,7 @@ case class GpuBroadcastHashJoinExec( JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME), JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) + FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) @@ -139,28 +139,20 @@ case class GpuBroadcastHashJoinExec( val filterTime = gpuLongMetric(FILTER_TIME) val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - val broadcastRelation = broadcastExchange - .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) - lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - withResource(combined) { combined => - filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) - } - } - - // Don't warn for a leak, because we cannot control when we are done with this - (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) - ret - } + val broadcastRelation = broadcastExchange + .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() val rdd = streamedPlan.executeColumnar() - rdd.mapPartitions(it => - doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, - numOutputBatches, streamTime, joinTime, filterTime, totalTime)) + rdd.mapPartitions { it => + val builtBatch = broadcastRelation.value.batch + GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected()) + doJoin(builtBatch, it, targetSize, spillCallback, + numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime, + filterTime, totalTime) + } } } diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala index 875f91eed4c..92f63db58c1 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala @@ -86,7 +86,7 @@ case class GpuBroadcastHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { import GpuMetric._ @@ -97,7 +97,7 @@ case class GpuBroadcastHashJoinExec( JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME), JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) + FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) @@ -138,28 +138,20 @@ case class GpuBroadcastHashJoinExec( val filterTime = gpuLongMetric(FILTER_TIME) val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - val broadcastRelation = broadcastExchange - .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) - lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - withResource(combined) { combined => - filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) - } - } - - // Don't warn for a leak, because we cannot control when we are done with this - (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) - ret - } + val broadcastRelation = broadcastExchange + .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() val rdd = streamedPlan.executeColumnar() - rdd.mapPartitions(it => - doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, - numOutputBatches, streamTime, joinTime, filterTime, totalTime)) + rdd.mapPartitions { it => + val builtBatch = broadcastRelation.value.batch + GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected()) + doJoin(builtBatch, it, targetSize, spillCallback, + numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime, + filterTime, totalTime) + } } } diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala index a957a364812..f0ea0169672 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -58,15 +58,15 @@ class GpuShuffledHashJoinMeta( } override def convertToGpu(): GpuExec = { - val Seq(leftChild, rightChild) = childPlans.map(_.convertIfNeeded()) + val Seq(left, right) = childPlans.map(_.convertIfNeeded) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), - leftChild, - rightChild) + left, + right) } } @@ -75,12 +75,12 @@ case class GpuShuffledHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan) - extends GpuShuffledHashJoinBase( - leftKeys, - rightKeys, - buildSide, - condition, - isSkewJoin = false) + extends GpuShuffledHashJoinBase( + leftKeys, + rightKeys, + buildSide, + condition, + isSkewJoin = false) diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala index f63b70408bc..0b81f0354e0 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala @@ -91,7 +91,7 @@ case class GpuBroadcastHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { import GpuMetric._ @@ -102,7 +102,7 @@ case class GpuBroadcastHashJoinExec( JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME), JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) + FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) @@ -143,28 +143,20 @@ case class GpuBroadcastHashJoinExec( val filterTime = gpuLongMetric(FILTER_TIME) val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - val broadcastRelation = broadcastExchange - .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) - lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - withResource(combined) { combined => - filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) - } - } - - // Don't warn for a leak, because we cannot control when we are done with this - (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) - ret - } + val broadcastRelation = broadcastExchange + .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() val rdd = streamedPlan.executeColumnar() - rdd.mapPartitions(it => - doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, - numOutputBatches, streamTime, joinTime, filterTime, totalTime)) + rdd.mapPartitions { it => + val builtBatch = broadcastRelation.value.batch + GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected()) + doJoin(builtBatch, it, targetSize, spillCallback, + numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime, + filterTime, totalTime) + } } } diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala index e25927e0c28..ac092c2a7c6 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala @@ -58,7 +58,7 @@ class GpuShuffledHashJoinMeta( } override def convertToGpu(): GpuExec = { - val Seq(left, right) = childPlans.map(_.convertIfNeeded()) + val Seq(left, right) = childPlans.map(_.convertIfNeeded) GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), @@ -76,7 +76,7 @@ case class GpuShuffledHashJoinExec( rightKeys: Seq[Expression], joinType: JoinType, buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], left: SparkPlan, right: SparkPlan, override val isSkewJoin: Boolean) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index a028e5e06db..7ea5eaf0dbb 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -81,7 +81,7 @@ public static synchronized void debug(String name, ColumnarBatch cb) { * @param name the name of the column to print out. * @param col the column to print out. */ - public static synchronized void debug(String name, ai.rapids.cudf.ColumnVector col) { + public static synchronized void debug(String name, ai.rapids.cudf.ColumnView col) { try (HostColumnVector hostCol = col.copyToHost()) { debug(name, hostCol); } @@ -671,7 +671,8 @@ static boolean typeConversionAllowed(Table table, DataType[] colTypes, int start */ static boolean typeConversionAllowed(Table table, DataType[] colTypes) { final int numColumns = table.getNumberOfColumns(); - assert numColumns == colTypes.length: "The number of columns and the number of types don't match"; + assert numColumns == colTypes.length: "The number of columns and the number of types don't " + + "match " + table + " " + Arrays.toString(colTypes); boolean ret = true; for (int colIndex = 0; colIndex < numColumns; colIndex++) { ret = ret && typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex]); diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala index f01f63f3e80..3aed390246f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala @@ -96,6 +96,14 @@ case class GpuBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean override def toString: String = s"input[$ordinal, ${dataType.simpleString}, $nullable]" override def columnarEval(batch: ColumnarBatch): Any = { - batch.column(ordinal).asInstanceOf[GpuColumnVector].incRefCount() + batch.column(ordinal) match { + case fb: GpuColumnVectorFromBuffer => + // When doing a project we might re-order columns or do other things that make it + // so this no longer looks like the original contiguous buffer it came from + // so to avoid it appearing to down stream processing as the same buffer we change + // the type here. + new GpuColumnVector(fb.dataType(), fb.getBase.incRefCount()) + case cv: GpuColumnVector => cv.incRefCount() + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala index e7ab29acf1a..f9ee1c62d15 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala @@ -16,10 +16,9 @@ package com.nvidia.spark.rapids -import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.FullOuter import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} import org.apache.spark.sql.execution.BinaryExecNode @@ -30,7 +29,7 @@ abstract class GpuShuffledHashJoinBase( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: GpuBuildSide, - condition: Option[Expression], + override val condition: Option[Expression], val isSkewJoin: Boolean) extends BinaryExecNode with GpuHashJoin { import GpuMetric._ @@ -42,7 +41,7 @@ abstract class GpuShuffledHashJoinBase( STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), - FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) + FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics override def requiredChildDistribution: Seq[Distribution] = HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil @@ -68,37 +67,26 @@ abstract class GpuShuffledHashJoinBase( val joinTime = gpuLongMetric(JOIN_TIME) val filterTime = gpuLongMetric(FILTER_TIME) val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) + val localBuildOutput: Seq[Attribute] = buildPlan.output streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { - var combinedSize = 0 - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - withResource(combined) { combined => - combinedSize = - GpuColumnVector.extractColumns(combined) - .map(_.getBase.getDeviceMemorySize).sum.toInt - filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) - } - } - } - val delta = System.nanoTime() - startTime - buildTime += delta - totalTime += delta - buildDataSize += combinedSize - val context = TaskContext.get() - context.addTaskCompletionListener[Unit](_ => builtTable.close()) + withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, + localBuildOutput)) { builtBatch => + // doJoin will increment the reference counts as needed for the builtBatch + val delta = System.nanoTime() - startTime + buildTime += delta + totalTime += delta + buildDataSize += GpuColumnVector.getTotalDeviceMemoryUsed(builtBatch) - doJoin(builtTable, streamIter, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, - streamTime, joinTime, filterTime, totalTime) + doJoin(builtBatch, streamIter, targetSize, spillCallback, + numOutputRows, joinOutputRows, numOutputBatches, + streamTime, joinTime, filterTime, totalTime) + } } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala new file mode 100644 index 00000000000..3bf18c354d2 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala @@ -0,0 +1,569 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.{ColumnView, DeviceMemoryBuffer, DType, GatherMap, NvtxColor, NvtxRange, OrderByArg, Scalar, Table} +import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback + +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataType, DateType, DecimalType, LongType, NullType, NumericType, StringType, StructType, TimestampType} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Holds something that can be spilled if it is marked as such, but it does not modify the + * data until it is ready to be spilled. This avoids the performance penalty of making reformatting + * the underlying data so it is ready to be spilled. + * + * Call `allowSpilling` to indicate that the data can be released for spilling and call `close` + * to indicate that the data is not needed any longer. + * + * If the data is needed after `allowSpilling` is called the implementations should get the data + * back and cache it again until allowSpilling is called once more. + */ +trait LazySpillable extends AutoCloseable { + + /** + * Indicate that we are done using the data for now and it can be spilled. + * + * This method should not have issues with being called multiple times without the data being + * accessed. + */ + def allowSpilling(): Unit +} + +/** + * Generic trait for all join gather instances. A JoinGatherer takes the gather maps that are the + * result of a cudf join call along with the data batches that need to be gathered and allow + * someone to materialize the join in batches. It also provides APIs to help decide on how + * many rows to gather. + * + * This is a LazySpillable instance so the life cycle follows that too. + */ +trait JoinGatherer extends LazySpillable with Arm { + /** + * Gather the next n rows from the join gather maps. + * + * @param n how many rows to gather + * @return the gathered data as a ColumnarBatch + */ + def gatherNext(n: Int): ColumnarBatch + + /** + * Is all of the data gathered so far. + */ + def isDone: Boolean + + /** + * Number of rows left to gather + */ + def numRowsLeft: Long + + /** + * A really fast and dirty way to estimate the size of each row in the join output measured as in + * bytes. + */ + def realCheapPerRowSizeEstimate: Double + + /** + * Get the bit count size map for the next n rows to be gathered. It returns a column of + * INT64 values. One for each of the next n rows requested. This is a bit count to deal with + * validity bits, etc. This is an INT64 to allow a prefix sum (running total) to be done on + * it without overflowing so we can compute an accurate cuttoff point for a batch size limit. + */ + def getBitSizeMap(n: Int): ColumnView + + /** + * If the data is all fixed width return the size of each row, otherwise return None. + */ + def getFixedWidthBitSize: Option[Int] + + /** + * Do a complete/expensive job to get the number of rows that can be gathered to get close + * to the targetSize for the final output. + * + * @param targetSize The target size in bytes for the final output batch. + */ + def gatherRowEstimate(targetSize: Long): Int = { + val bitSizePerRow = getFixedWidthBitSize + if (bitSizePerRow.isDefined) { + Math.min(Math.min((targetSize / bitSizePerRow.get) / 8, numRowsLeft), Integer.MAX_VALUE).toInt + } else { + // WARNING magic number below. The rowEstimateMultiplier is arbitrary, we want to get + // enough rows that we include that we go over the target size, but not too much so we + // waste memory. It could probably be tuned better. + val rowEstimateMultiplier = 1.1 + val estimatedRows = Math.min( + ((targetSize / realCheapPerRowSizeEstimate) * rowEstimateMultiplier).toLong, + numRowsLeft) + val numRowsToProbe = Math.min(estimatedRows, Integer.MAX_VALUE).toInt + if (numRowsToProbe <= 0) { + 1 + } else { + val sum = withResource(getBitSizeMap(numRowsToProbe)) { bitSizes => + bitSizes.prefixSum() + } + val cutoff = withResource(sum) { sum => + // Lower bound needs tables, so we have to wrap everything in tables... + withResource(new Table(sum)) { sumTable => + withResource(ai.rapids.cudf.ColumnVector.fromLongs(targetSize * 8)) { bound => + withResource(new Table(bound)) { boundTab => + sumTable.lowerBound(boundTab, OrderByArg.asc(0)) + } + } + } + } + withResource(cutoff) { cutoff => + withResource(cutoff.copyToHost()) { hostCutoff => + Math.max(1, hostCutoff.getInt(0)) + } + } + } + } + } +} + +object JoinGatherer extends Arm { + def apply(gatherMap: LazySpillableGatherMap, + inputData: LazySpillableColumnarBatch): JoinGatherer = + new JoinGathererImpl(gatherMap, inputData) + + def apply(leftMap: LazySpillableGatherMap, + leftData: LazySpillableColumnarBatch, + rightMap: LazySpillableGatherMap, + rightData: LazySpillableColumnarBatch): JoinGatherer = { + val left = JoinGatherer(leftMap, leftData) + val right = JoinGatherer(rightMap, rightData) + MultiJoinGather(left, right) + } + + def getRowsInNextBatch(gatherer: JoinGatherer, targetSize: Long): Int = { + withResource(new NvtxRange("calc gather size", NvtxColor.YELLOW)) { _ => + val rowsLeft = gatherer.numRowsLeft + val rowEstimate: Long = gatherer.getFixedWidthBitSize match { + case Some(fixedSize) => + // Odd corner cases for tests, make sure we do at least one row + Math.max(1, (targetSize / fixedSize) / 8) + case None => + // Heuristic to see if we need to do the expensive calculation + if (rowsLeft * gatherer.realCheapPerRowSizeEstimate <= targetSize * 0.75) { + rowsLeft + } else { + gatherer.gatherRowEstimate(targetSize) + } + } + Math.min(Math.min(rowEstimate, rowsLeft), Integer.MAX_VALUE).toInt + } + } +} + + +/** + * Holds a Columnar batch that is LazySpillable. + */ +trait LazySpillableColumnarBatch extends LazySpillable { + /** + * How many rows are in the underlying batch. Should not unspill the batch to get this into. + */ + def numRows: Int + + /** + * How many columns are in the underlying batch. Should not unspill the batch to get this info. + */ + def numCols: Int + + /** + * The amount of device memory in bytes that the underlying batch uses. Should not unspill the + * batch to get this info. + */ + def deviceMemorySize: Long + + /** + * The data types of the underlying batches columns. Should not unspill the batch to get this + * info. + */ + def dataTypes: Array[DataType] + + + /** + * Get the batch that this wraps and unspill it if needed. + */ + def getBatch: ColumnarBatch + +} + +object LazySpillableColumnarBatch { + def apply(cb: ColumnarBatch, + spillCallback: SpillCallback, + name: String): LazySpillableColumnarBatch = + new LazySpillableColumnarBatchImpl(cb, spillCallback, name) + + def spillOnly(wrapped: LazySpillableColumnarBatch): LazySpillableColumnarBatch = wrapped match { + case alreadyGood: AllowSpillOnlyLazySpillableColumnarBatchImpl => alreadyGood + case anythingElse => AllowSpillOnlyLazySpillableColumnarBatchImpl(anythingElse) + } +} + +/** + * A version of `LazySpillableColumnarBatch` where instead of closing the underlying + * batch it is only spilled. This is used for cases, like with a streaming hash join + * where the data itself needs to out live the JoinGatherer it is haded off to. + */ +case class AllowSpillOnlyLazySpillableColumnarBatchImpl(wrapped: LazySpillableColumnarBatch) + extends LazySpillableColumnarBatch { + override def getBatch: ColumnarBatch = + wrapped.getBatch + + override def numRows: Int = wrapped.numRows + override def numCols: Int = wrapped.numCols + override def deviceMemorySize: Long = wrapped.deviceMemorySize + override def dataTypes: Array[DataType] = wrapped.dataTypes + + override def allowSpilling(): Unit = + wrapped.allowSpilling() + + override def close(): Unit = { + // Don't actually close it, we don't own it, just allow it to be spilled. + wrapped.allowSpilling() + } +} + +/** + * Holds a columnar batch that is cached until it is marked that it can be spilled. + */ +class LazySpillableColumnarBatchImpl( + cb: ColumnarBatch, + spillCallback: SpillCallback, + name: String) extends LazySpillableColumnarBatch with Arm { + + private var cached: Option[ColumnarBatch] = Some(GpuColumnVector.incRefCounts(cb)) + private var spill: Option[SpillableColumnarBatch] = None + override val numRows: Int = cb.numRows() + override val deviceMemorySize: Long = GpuColumnVector.getTotalDeviceMemoryUsed(cb) + override val dataTypes: Array[DataType] = GpuColumnVector.extractTypes(cb) + override val numCols: Int = dataTypes.length + + override def getBatch: ColumnarBatch = { + if (cached.isEmpty) { + withResource(new NvtxRange("get batch " + name, NvtxColor.RED)) { _ => + cached = spill.map(_.getColumnarBatch()) + } + } + cached.get + } + + override def allowSpilling(): Unit = { + if (spill.isEmpty && cached.isDefined) { + withResource(new NvtxRange("spill batch " + name, NvtxColor.RED)) { _ => + // First time we need to allow for spilling + spill = Some(SpillableColumnarBatch(cached.get, + SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) + // Putting data in a SpillableColumnarBatch takes ownership of it. + cached = None + } + } + cached.foreach(_.close()) + cached = None + } + + override def close(): Unit = { + cached.foreach(_.close()) + cached = None + spill.foreach(_.close()) + spill = None + } +} + +/** + * Holds a gather map that is also lazy spillable. + */ +class LazySpillableGatherMap( + map: GatherMap, + spillCallback: SpillCallback, + name: String) extends LazySpillable with Arm { + + val getRowCount: Long = map.getRowCount + + private var cached: Option[DeviceMemoryBuffer] = Some(map.releaseBuffer()) + private var spill: Option[SpillableBuffer] = None + + /** + * Get a ColumnView that can be used to do a cudf gather. + */ + def toColumnView(startRow: Long, numRows: Int): ColumnView = { + ColumnView.fromDeviceBuffer(getBuffer, startRow * 4L, DType.INT32, numRows) + } + + private def getBuffer = { + if (cached.isEmpty) { + withResource(new NvtxRange("get map " + name, NvtxColor.RED)) { _ => + cached = spill.map(_.getDeviceBuffer()) + } + } + cached.get + } + + override def allowSpilling(): Unit = { + if (spill.isEmpty && cached.isDefined) { + withResource(new NvtxRange("spill map " + name, NvtxColor.RED)) { _ => + // First time we need to allow for spilling + spill = Some(SpillableBuffer(cached.get, + SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) + // Putting data in a SpillableBuffer takes ownership of it. + cached = None + } + } + cached.foreach(_.close()) + cached = None + } + + override def close(): Unit = { + cached.foreach(_.close()) + cached = None + spill.foreach(_.close()) + spill = None + } +} + +object JoinGathererImpl { + + /** + * Calculate the row size in bits for a fixed width schema. If a type is encountered that is + * not fixed width, or is not known a None is returned. + */ + def fixedWidthRowSizeBits(dts: Seq[DataType]): Option[Int] = + sumRowSizesBits(dts, nullValueCalc = false) + + /** + * Calculate the null row size for a given schema in bits. If an unexpected type is encountered + * an exception is thrown + */ + def nullRowSizeBits(dts: Seq[DataType]): Int = + sumRowSizesBits(dts, nullValueCalc = true).get + + + /** + * Sum the row sizes for each data type passed in. If any one of the sizes is not available + * the entire result is considered to not be available. If nullValueCalc is true a result is + * guaranteed to be returned or an exception thrown. + */ + private def sumRowSizesBits(dts: Seq[DataType], nullValueCalc: Boolean): Option[Int] = { + val allOptions = dts.map(calcRowSizeBits(_, nullValueCalc)) + if (allOptions.exists(_.isEmpty)) { + None + } else { + Some(allOptions.map(_.get).sum + 1) + } + } + + /** + * Calculate the row bit size for the given data type. If nullValueCalc is false + * then variable width types and unexpected types will result in a None being returned. + * If it is true variable width types will have a value returned that corresponds to a + * null, and unknown types will throw an exception. + */ + private def calcRowSizeBits(dt: DataType, nullValueCalc: Boolean): Option[Int] = dt match { + case StructType(fields) => + sumRowSizesBits(fields.map(_.dataType), nullValueCalc) + case dt: DecimalType if dt.precision > DType.DECIMAL64_MAX_PRECISION => + if (nullValueCalc) { + throw new IllegalArgumentException(s"Found an unsupported type $dt") + } else { + None + } + case _: NumericType | DateType | TimestampType | BooleanType | NullType => + Some(GpuColumnVector.getNonNestedRapidsType(dt).getSizeInBytes * 8 + 1) + case StringType | BinaryType | ArrayType(_, _) if nullValueCalc => + // Single offset value and a validity value + Some((DType.INT32.getSizeInBytes * 8) + 1) + case x if nullValueCalc => + throw new IllegalArgumentException(s"Found an unsupported type $x") + case _ => None + } +} + +/** + * JoinGatherer for a single map/table + */ +class JoinGathererImpl( + private val gatherMap: LazySpillableGatherMap, + private val data: LazySpillableColumnarBatch) extends JoinGatherer { + + // How much of the gather map we have output so far + private var gatheredUpTo: Long = 0 + private val totalRows: Long = gatherMap.getRowCount + private val (fixedWidthRowSizeBits, nullRowSizeBits) = { + val dts = data.dataTypes + val fw = JoinGathererImpl.fixedWidthRowSizeBits(dts) + val nullVal = JoinGathererImpl.nullRowSizeBits(dts) + (fw, nullVal) + } + + override def realCheapPerRowSizeEstimate: Double = { + val totalInputRows: Int = data.numRows + val totalInputSize: Long = data.deviceMemorySize + // Avoid divide by 0 here and later on + if (totalInputRows > 0 && totalInputSize > 0) { + totalInputSize.toDouble / totalInputRows + } else { + 1.0 + } + } + + override def getFixedWidthBitSize: Option[Int] = fixedWidthRowSizeBits + + override def gatherNext(n: Int): ColumnarBatch = { + val start = gatheredUpTo + assert((start + n) <= totalRows) + val ret = withResource(gatherMap.toColumnView(start, n)) { gatherView => + val batch = data.getBatch + val gatheredTable = withResource(GpuColumnVector.from(batch)) { table => + table.gather(gatherView) + } + withResource(gatheredTable) { gt => + GpuColumnVector.from(gt, GpuColumnVector.extractTypes(batch)) + } + } + gatheredUpTo += n + ret + } + + override def isDone: Boolean = + gatheredUpTo >= totalRows + + override def numRowsLeft: Long = totalRows - gatheredUpTo + + override def allowSpilling(): Unit = { + data.allowSpilling() + gatherMap.allowSpilling() + } + + override def getBitSizeMap(n: Int): ColumnView = { + val cb = data.getBatch + val inputBitCounts = withResource(GpuColumnVector.from(cb)) { table => + withResource(table.rowBitCount()) { bits => + bits.castTo(DType.INT64) + } + } + // Gather the bit counts so we know what the output table will look like + val gatheredBitCount = withResource(inputBitCounts) { inputBitCounts => + withResource(gatherMap.toColumnView(gatheredUpTo, n)) { gatherView => + // Gather only works on a table so wrap the single column + val gatheredTab = withResource(new Table(inputBitCounts)) { table => + table.gather(gatherView) + } + withResource(gatheredTab) { gatheredTab => + gatheredTab.getColumn(0).incRefCount() + } + } + } + // The gather could have introduced nulls in the case of outer joins. Because of that + // we need to replace them with an appropriate size + if (gatheredBitCount.hasNulls) { + withResource(gatheredBitCount) { gatheredBitCount => + withResource(Scalar.fromLong(nullRowSizeBits.toLong)) { nullSize => + withResource(gatheredBitCount.isNull) { nullMask => + nullMask.ifElse(nullSize, gatheredBitCount) + } + } + } + } else { + gatheredBitCount + } + } + + override def close(): Unit = { + gatherMap.close() + data.close() + } +} + +/** + * Join Gatherer for a left table and a right table + */ +case class MultiJoinGather(left: JoinGatherer, right: JoinGatherer) extends JoinGatherer { + assert(left.numRowsLeft == right.numRowsLeft, + "all gatherers much have the same number of rows to gather") + + override def gatherNext(n: Int): ColumnarBatch = { + withResource(left.gatherNext(n)) { leftGathered => + withResource(right.gatherNext(n)) { rightGathered => + val vectors = Seq(leftGathered, rightGathered).flatMap { batch => + (0 until batch.numCols()).map { i => + val col = batch.column(i) + col.asInstanceOf[GpuColumnVector].incRefCount() + col + } + }.toArray + new ColumnarBatch(vectors, n) + } + } + } + + override def isDone: Boolean = left.isDone + + override def numRowsLeft: Long = left.numRowsLeft + + override def allowSpilling(): Unit = { + left.allowSpilling() + right.allowSpilling() + } + + override def realCheapPerRowSizeEstimate: Double = + left.realCheapPerRowSizeEstimate + right.realCheapPerRowSizeEstimate + + override def getBitSizeMap(n: Int): ColumnView = { + (left.getFixedWidthBitSize, right.getFixedWidthBitSize) match { + case (Some(l), Some(r)) => + // This should never happen because all fixed width should be covered by + // a faster code path. But just in case we provide it anyways. + withResource(GpuScalar.from(l.toLong + r.toLong, LongType)) { s => + ai.rapids.cudf.ColumnVector.fromScalar(s, n) + } + case (Some(l), None) => + withResource(GpuScalar.from(l.toLong, LongType)) { ls => + withResource(right.getBitSizeMap(n)) { rightBits => + ls.add(rightBits, DType.INT64) + } + } + case (None, Some(r)) => + withResource(GpuScalar.from(r.toLong, LongType)) { rs => + withResource(left.getBitSizeMap(n)) { leftBits => + rs.add(leftBits, DType.INT64) + } + } + case _ => + withResource(left.getBitSizeMap(n)) { leftBits => + withResource(right.getBitSizeMap(n)) { rightBits => + leftBits.add(rightBits, DType.INT64) + } + } + } + } + + override def getFixedWidthBitSize: Option[Int] = { + (left.getFixedWidthBitSize, right.getFixedWidthBitSize) match { + case (Some(l), Some(r)) => Some(l + r) + case _ => None + } + } + + override def close(): Unit = { + left.close() + right.close() + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala index 300fa1bceed..afa5fdf39e9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala @@ -146,6 +146,18 @@ object MetaUtils extends Arm { } } + /** + * This is a hack to create a table meta that passed muster, but is not really going to be used + */ + lazy val ignoreTableMeta: TableMeta = { + val fbb = new FlatBufferBuilder(1024) + TableMeta.startTableMeta(fbb) + TableMeta.addRowCount(fbb, 0) + fbb.finish(TableMeta.endTableMeta(fbb)) + // copy the message to trim the backing array to only what is needed + TableMeta.getRootAsTableMeta(ByteBuffer.wrap(fbb.sizedByteArray())) + } + /** * Construct a table from a contiguous device buffer and a * `TableMeta` message describing the schema of the buffer data. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index c1b17d1e40c..8164f8b8595 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.ContiguousTable +import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer} import org.apache.spark.TaskContext import org.apache.spark.sql.rapids.TempSpillBufferId @@ -208,4 +208,72 @@ object SpillableColumnarBatch extends Arm { } } } +} + + +/** + * Just like a SpillableColumnarBatch but for buffers. + */ +class SpillableBuffer (id: TempSpillBufferId) extends AutoCloseable with Arm { + private var closed = false + + /** + * The ID that this is stored under. + * @note Use with caution because if this has been closed the id is no longer valid. + */ + def spillId: TempSpillBufferId = id + + lazy val sizeInBytes: Long = + withResource(RapidsBufferCatalog.acquireBuffer(id)) { buff => + buff.size + } + + /** + * Set a new spill priority. + */ + def setSpillPriority(priority: Long): Unit = { + withResource(RapidsBufferCatalog.acquireBuffer(id)) { rapidsBuffer => + rapidsBuffer.setSpillPriority(priority) + } + } + + /** + * Get the device buffer. + * @note It is the responsibility of the caller to close the buffer. + */ + def getDeviceBuffer(): DeviceMemoryBuffer = { + withResource(RapidsBufferCatalog.acquireBuffer(id)) { rapidsBuffer => + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + rapidsBuffer.getDeviceMemoryBuffer + } + } + + /** + * Remove the buffer from the cache. + */ + override def close(): Unit = { + if (!closed) { + RapidsBufferCatalog.removeBuffer(id) + closed = true + } + } +} + +object SpillableBuffer extends Arm { + + /** + * Create a new SpillableBuffer. + * @note This takes over ownership of buffer, and buffer should not be used after this. + * @param buffer the buffer to make spillable + * @param priority the initial spill priority of this buffer + * @param spillCallback a callback when the buffer is spilled. This should be very light weight. + * It should never allocate GPU memory and really just be used for metrics. + */ + def apply(buffer: DeviceMemoryBuffer, + priority: Long, + spillCallback: RapidsBuffer.SpillCallback): SpillableBuffer = { + val id = TempSpillBufferId() + RapidsBufferCatalog.addBuffer(id, buffer, MetaUtils.ignoreTableMeta, priority, spillCallback) + new SpillableBuffer(id) + } } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index f104efb0ca8..2b7ea1028a9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -54,7 +54,7 @@ class GpuBroadcastNestedLoopJoinMeta( join.joinType match { case Inner => case Cross => - case _ => willNotWorkOnGpu(s"$join.joinType currently is not supported") + case _ => willNotWorkOnGpu(s"${join.joinType} currently is not supported") } val gpuBuildSide = ShimLoader.getSparkShims.getBuildSide(join) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index e18b003a35f..fa3255e0810 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -15,15 +15,16 @@ */ package org.apache.spark.sql.rapids.execution -import ai.rapids.cudf.{NvtxColor, Table} +import ai.rapids.cudf.{GatherMap, NvtxColor, Table} import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.{Cross, ExistenceJoin, FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.types.{ArrayType, MapType, StructType} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch object JoinTypeChecks { def tagForGpu(joinType: JoinType, meta: RapidsMeta[_, _, _]): Unit = { @@ -55,7 +56,8 @@ object JoinTypeChecks { } } -object GpuHashJoin { +object GpuHashJoin extends Arm { + def tagJoin( meta: RapidsMeta[_, _, _], joinType: JoinType, @@ -90,12 +92,301 @@ object GpuHashJoin { } } - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb + def extractTopLevelAttributes( + exprs: Seq[Expression], + includeAlias: Boolean): Seq[Option[Attribute]] = + exprs.map { + case a: AttributeReference => Some(a.toAttribute) + case GpuAlias(a: AttributeReference, _) if includeAlias => Some(a.toAttribute) + case _ => None + } + + /** + * Filter rows from the batch where any of the keys are null. + */ + def filterNulls(cb: ColumnarBatch, boundKeys: Seq[Expression]): ColumnarBatch = { + var mask: ai.rapids.cudf.ColumnVector = null + try { + withResource(GpuProjectExec.project(cb, boundKeys)) { keys => + val keyColumns = GpuColumnVector.extractBases(keys) + keyColumns.foreach { column => + if (column.hasNulls) { + withResource(column.isNotNull) { nn => + if (mask == null) { + mask = nn.incRefCount() + } else { + mask = withResource(mask) { _ => + mask.and(nn) + } + } + } + } + } + } + + if (mask == null) { + // There was nothing to filter. + GpuColumnVector.incRefCounts(cb) + } else { + val colTypes = GpuColumnVector.extractTypes(cb) + withResource(GpuColumnVector.from(cb)) { tbl => + withResource(tbl.filter(mask)) { filtered => + GpuColumnVector.from(filtered, colTypes) + } + } + } + } finally { + if (mask != null) { + mask.close() + } + } + } + + /** + * Given sequence of expressions, detect whether there exists any StructType expressions + * who contains nullable child columns. + * Since cuDF can not match nullable children as Spark during join, we detect them before join + * to apply some walking around strategies. For some details, please refer the issue: + * https://github.com/NVIDIA/spark-rapids/issues/2126. + * + * NOTE that this does not work for arrays of Structs or Maps that are not supported as join keys + * yet. + */ + def anyNullableStructChild(expressions: Seq[Expression]): Boolean = { + def anyNullableChild(struct: StructType): Boolean = { + struct.fields.exists { field => + if (field.nullable) { + true + } else field.dataType match { + case structType: StructType => + anyNullableChild(structType) + case _ => false + } + } + } + + expressions.map(_.dataType).exists { + case st: StructType => + anyNullableChild(st) + case _ => false + } + } +} + +/** + * An iterator that does a hash join against a stream of batches. + */ +class HashJoinIterator( + inputBuiltKeys: ColumnarBatch, + inputBuiltData: ColumnarBatch, + private val stream: Iterator[ColumnarBatch], + val boundStreamKeys: Seq[Expression], + val boundStreamData: Seq[Expression], + val streamAttributes: Seq[Attribute], + val targetSize: Long, + val joinType: JoinType, + val buildSide: GpuBuildSide, + var compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs + private val spillCallback: SpillCallback, + private val streamTime: GpuMetric, + private val joinTime: GpuMetric, + private val totalTime: GpuMetric) extends Iterator[ColumnarBatch] with Arm { + import scala.collection.JavaConverters._ + + // For some join types even if there is no stream data we might output something + private var initialJoin = true + private var nextCb: Option[ColumnarBatch] = None + private var gathererStore: Option[JoinGatherer] = None + // Close the input keys, the lazy spillable batch now owns it. + private val builtKeys = withResource(inputBuiltKeys) { inputBuiltKeys => + LazySpillableColumnarBatch(inputBuiltKeys, spillCallback, "build_keys") + } + // Close the input data, the lazy spillable batch now owns it. + private val builtData = withResource(inputBuiltData) { inputBuiltData => + LazySpillableColumnarBatch(inputBuiltData, spillCallback, "build_data") + } + private var closed = false + + def close(): Unit = { + if (!closed) { + builtKeys.close() + builtData.close() + nextCb.foreach(_.close()) + nextCb = None + gathererStore.foreach(_.close()) + gathererStore = None + closed = true + } + } + + TaskContext.get().addTaskCompletionListener[Unit](_ => close()) + + private def nextCbFromGatherer(): Option[ColumnarBatch] = { + withResource(new NvtxWithMetrics("hash join gather", NvtxColor.DARK_GREEN, joinTime)) { _ => + val ret = gathererStore.map { gather => + val nextRows = JoinGatherer.getRowsInNextBatch(gather, targetSize) + gather.gatherNext(nextRows) + } + if (gathererStore.exists(_.isDone)) { + gathererStore.foreach(_.close()) + gathererStore = None + } + + if (ret.isDefined) { + // We are about to return something. We got everything we need from it so now let it spill + // if there is more to be gathered later on. + gathererStore.foreach(_.allowSpilling()) + } + ret + } + } + + private def makeGatherer( + maps: Array[GatherMap], + leftData: LazySpillableColumnarBatch, + rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { + assert(maps.length > 0 && maps.length <= 2) + try { + val leftMap = maps.head + val rightMap = if (maps.length > 1) { + if (rightData.numCols == 0) { + // No data so don't bother with it + None + } else { + Some(maps(1)) + } + } else { + None + } + + val lazyLeftMap = new LazySpillableGatherMap(leftMap, spillCallback, "left_map") + val gatherer = rightMap match { + case None => + rightData.close() + JoinGatherer(lazyLeftMap, leftData) + case Some(right) => + val lazyRightMap = new LazySpillableGatherMap(right, spillCallback, "right_map") + JoinGatherer(lazyLeftMap, leftData, lazyRightMap, rightData) + } + if (gatherer.isDone) { + // Nothing matched... + gatherer.close() + None + } else { + Some(gatherer) + } + } finally { + maps.foreach(_.close()) + } + } + + private def joinGathererLeftRight( + leftKeys: Table, + leftData: LazySpillableColumnarBatch, + rightKeys: Table, + rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { + withResource(new NvtxWithMetrics("hash join gather map", NvtxColor.ORANGE, joinTime)) { _ => + val maps = joinType match { + case LeftOuter => leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual) + case RightOuter => + // Reverse the output of the join, because we expect the right gather map to + // always be on the right + rightKeys.leftJoinGatherMaps(leftKeys, compareNullsEqual).reverse + case _: InnerLike => leftKeys.innerJoinGatherMaps(rightKeys, compareNullsEqual) + case LeftSemi => Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual)) + case LeftAnti => Array(leftKeys.leftAntiJoinGatherMap(rightKeys, compareNullsEqual)) + case FullOuter => leftKeys.fullJoinGatherMaps(rightKeys, compareNullsEqual) + case _ => + throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + + s" supported") + } + makeGatherer(maps, leftData, rightData) + } + } + + private def joinGathererLeftRight( + leftKeys: ColumnarBatch, + leftData: LazySpillableColumnarBatch, + rightKeys: ColumnarBatch, + rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { + withResource(GpuColumnVector.from(leftKeys)) { leftKeysTab => + withResource(GpuColumnVector.from(rightKeys)) { rightKeysTab => + joinGathererLeftRight(leftKeysTab, leftData, rightKeysTab, rightData) + } + } + } + + private def joinGatherer( + buildKeys: ColumnarBatch, + buildData: LazySpillableColumnarBatch, + streamKeys: ColumnarBatch, + streamData: LazySpillableColumnarBatch): Option[JoinGatherer] = { + buildSide match { + case GpuBuildLeft => + joinGathererLeftRight(buildKeys, buildData, streamKeys, streamData) + case GpuBuildRight => + joinGathererLeftRight(streamKeys, streamData, buildKeys, buildData) + } + } + + private def joinGatherer( + buildKeys: ColumnarBatch, + buildData: LazySpillableColumnarBatch, + streamCb: ColumnarBatch): Option[JoinGatherer] = { + withResource(GpuProjectExec.project(streamCb, boundStreamKeys)) { streamKeys => + withResource(GpuProjectExec.project(streamCb, boundStreamData)) { streamData => + joinGatherer(buildKeys, LazySpillableColumnarBatch.spillOnly(buildData), + streamKeys, LazySpillableColumnarBatch(streamData, spillCallback, "stream_data")) + } + } + } + + override def hasNext: Boolean = { + var mayContinue = true + while (nextCb.isEmpty && mayContinue) { + val startTime = System.nanoTime() + if (gathererStore.exists(!_.isDone)) { + nextCb = nextCbFromGatherer() + } else if (stream.hasNext) { + // Need to refill the gatherer + gathererStore.foreach(_.close()) + gathererStore = None + withResource(stream.next()) { cb => + streamTime += (System.nanoTime() - startTime) + gathererStore = joinGatherer(builtKeys.getBatch, builtData, cb) + } + nextCb = nextCbFromGatherer() + } else if (initialJoin) { + withResource(GpuColumnVector.emptyBatch(streamAttributes.asJava)) { cb => + gathererStore = joinGatherer(builtKeys.getBatch, builtData, cb) + } + nextCb = nextCbFromGatherer() + } else { + mayContinue = false + } + totalTime += (System.nanoTime() - startTime) + initialJoin = false + } + if (nextCb.isEmpty) { + // Nothing is left to return so close ASAP. + close() + } else { + builtKeys.allowSpilling() + } + nextCb.isDefined + } + + override def next(): ColumnarBatch = { + if (!hasNext) { + throw new NoSuchElementException() + } + val ret = nextCb.get + nextCb = None + ret } } + trait GpuHashJoin extends GpuExec { def left: SparkPlan def right: SparkPlan @@ -156,15 +447,31 @@ trait GpuHashJoin extends GpuExec { } } - protected lazy val (gpuBuildKeys, gpuStreamedKeys) = { - require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), - "Join keys from two sides should have same types") - val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) - val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) - buildSide match { - case GpuBuildLeft => (lkeys, rkeys) - case GpuBuildRight => (rkeys, lkeys) + def dedupDataFromKeys( + rightOutput: Seq[Attribute], + rightKeys: Seq[Expression], + leftKeys: Seq[Expression]): (Seq[Attribute], Seq[NamedExpression]) = { + // This means that we need a mapping from what we remove on the right to what in leftData can + // provide it. These are still symbolic references, so we are going to convert everything into + // attributes, and use it to make out mapping. + val leftKeyAttributes = GpuHashJoin.extractTopLevelAttributes(leftKeys, includeAlias = true) + val rightKeyAttributes = GpuHashJoin.extractTopLevelAttributes(rightKeys, includeAlias = false) + val zippedKeysMapping = rightKeyAttributes.zip(leftKeyAttributes) + val rightToLeftKeyMap = zippedKeysMapping.filter { + case (Some(_), Some(_: AttributeReference)) => true + case _ => false + }.map { + case (Some(right), Some(left)) => (right.exprId, left) + case _ => throw new IllegalStateException("INTERNAL ERROR THIS SHOULD NOT BE REACHABLE") + }.toMap + + val rightData = rightOutput.filterNot(att => rightToLeftKeyMap.contains(att.exprId)) + val remappedRightOutput = rightOutput.map { att => + rightToLeftKeyMap.get(att.exprId) + .map(leftAtt => GpuAlias(leftAtt, att.name)(att.exprId)) + .getOrElse(att) } + (rightData, remappedRightOutput) } // For join types other than FullOuter, we simply set compareNullsEqual as true to adapt @@ -172,48 +479,87 @@ trait GpuHashJoin extends GpuExec { // compareNullsEqual = true, because we filter all null records from build table before join. // For some details, please refer the issue: https://github.com/NVIDIA/spark-rapids/issues/2126 protected lazy val compareNullsEqual: Boolean = (joinType != FullOuter) && - anyNullableStructChild(gpuBuildKeys) + GpuHashJoin.anyNullableStructChild(buildKeys) /** - * Place the columns in left and the columns in right into a single ColumnarBatch + * Spark does joins rather simply. They do it row by row, and as such don't really worry + * about how much space is being taken up. We are doing this in batches, and have the option to + * deduplicate columns that we know are the same to save even more memory. + * + * As such we do the join in a few different stages. + * + * 1. We separate out the join keys from the data that will be gathered. The join keys are used + * to produce a gather map, and then can be released. The data needs to stay until it has been + * gathered. Depending on the type of join and what is being done the join output is likely to + * contain the join keys twice. We don't want to do this because it takes up too much memory + * so we remove the keys from the data for one side of the join. + * + * 2. After this we will do the join. We can produce multiple batches from a single + * pair of input batches. The output of this stage is called the intermediate output and is the + * data columns from each side of the join smashed together. + * + * 3. In some cases there is a condition that filters out data from the join that should not be + * included. In the CPU code the condition will operate on the intermediate output. In some cases + * the condition may need to be rewritten to point to the deduplicated key column. + * + * 4. Finally we need to fix up the data to produce the correct output. This should be a simple + * projection that puts the deduplicated keys back to where they need to be. */ - def combine(left: ColumnarBatch, right: ColumnarBatch): ColumnarBatch = { - val l = GpuColumnVector.extractColumns(left) - val r = GpuColumnVector.extractColumns(right) - val c = l ++ r - new ColumnarBatch(c.asInstanceOf[Array[ColumnVector]], left.numRows()) + protected lazy val (leftData, rightData, intermediateOutput, finalProject) = { + require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), + "Join keys from two sides should have same types") + val (leftData, remappedLeftOutput, rightData, remappedRightOutput) = joinType match { + case FullOuter | RightOuter | LeftOuter => + // We cannot dedupe anything here because we can get nulls in the key columns on + // at least one side, so they do not match + (left.output, left.output, right.output, right.output) + case LeftSemi | LeftAnti => + // These only need the keys from the right hand side, in fact there should only be keys on + // the right hand side, except if there is a condition, but we don't support conditions for + // these joins, so it is OK + (left.output, left.output, Seq.empty, Seq.empty) + case _: InnerLike => + val (rightData, remappedRightData) = dedupDataFromKeys(right.output, rightKeys, leftKeys) + (left.output, left.output, rightData, remappedRightData) + case x => + throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") + } + + val intermediateOutput = leftData ++ rightData + + val finalProject: Seq[Expression] = joinType match { + case _: InnerLike | LeftOuter | RightOuter | FullOuter => + remappedLeftOutput ++ remappedRightOutput + case LeftExistence(_) => + remappedLeftOutput + case x => + throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") + } + (leftData, rightData, intermediateOutput, finalProject) } - // TODO eventually dedupe the keys - lazy val joinKeyIndices: Range = gpuBuildKeys.indices - - val localBuildOutput: Seq[Attribute] = buildPlan.output - // The first columns are the ones we joined on and need to remove - lazy val joinIndices: Seq[Int] = joinType match { - case RightOuter => - // The left table and right table are switched in the output - // because we don't support a right join, only left - val numRight = right.output.length - val numLeft = left.output.length - val joinLength = joinKeyIndices.length - def remap(index: Int): Int = { - if (index < numLeft) { - // part of the left table, but is on the right side of the tmp output - index + joinLength + numRight - } else { - // part of the right table, but is on the left side of the tmp output - index + joinLength - numLeft - } - } - output.indices.map (remap) - case _ => - val joinLength = joinKeyIndices.length - output.indices.map (v => v + joinLength) + protected lazy val (boundBuildKeys, boundBuildData, + boundStreamKeys, boundStreamData, + boundCondition, boundFinalProject) = { + val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) + val ldata = GpuBindReferences.bindGpuReferences(leftData, left.output) + val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) + val rdata = GpuBindReferences.bindGpuReferences(rightData, right.output) + val boundCondition = + condition.map(c => GpuBindReferences.bindGpuReference(c, intermediateOutput)) + val boundFinalProject = GpuBindReferences.bindGpuReferences(finalProject, intermediateOutput) + + buildSide match { + case GpuBuildLeft => (lkeys, ldata, rkeys, rdata, boundCondition, boundFinalProject) + case GpuBuildRight => (rkeys, rdata, lkeys, ldata, boundCondition, boundFinalProject) + } } - def doJoin(builtTable: Table, + def doJoin( + builtBatch: ColumnarBatch, stream: Iterator[ColumnarBatch], - boundCondition: Option[Expression], + targetSize: Long, + spillCallback: SpillCallback, numOutputRows: GpuMetric, joinOutputRows: GpuMetric, numOutputBatches: GpuMetric, @@ -221,229 +567,62 @@ trait GpuHashJoin extends GpuExec { joinTime: GpuMetric, filterTime: GpuMetric, totalTime: GpuMetric): Iterator[ColumnarBatch] = { - new Iterator[ColumnarBatch] { - import scala.collection.JavaConverters._ - var nextCb: Option[ColumnarBatch] = None - var first: Boolean = true - - TaskContext.get().addTaskCompletionListener[Unit](_ => closeCb()) - - def closeCb(): Unit = { - nextCb.foreach(_.close()) - nextCb = None - } - - override def hasNext: Boolean = { - var mayContinue = true - while (nextCb.isEmpty && mayContinue) { - val startTime = System.nanoTime() - if (stream.hasNext) { - val cb = stream.next() - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else if (first) { - // We have to at least try one in some cases - val cb = GpuColumnVector.emptyBatch(streamedPlan.output.asJava) - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else { - mayContinue = false - } - first = false - } - nextCb.isDefined - } - - override def next(): ColumnarBatch = { - if (!hasNext) { - throw new NoSuchElementException() - } - val ret = nextCb.get - nextCb = None - ret - } - } - } - - private[this] def doJoin(builtTable: Table, - streamedBatch: ColumnarBatch, - boundCondition: Option[Expression], - numOutputRows: GpuMetric, - numJoinOutputRows: GpuMetric, - numOutputBatches: GpuMetric, - joinTime: GpuMetric, - filterTime: GpuMetric): Option[ColumnarBatch] = { - - val combined = withResource(streamedBatch) { streamedBatch => - withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { - streamedKeysBatch => - GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) + // The 10k is mostly for tests, hopefully no one is setting anything that low in production. + val realTarget = Math.max(targetSize, 10 * 1024) + + val (builtKeys, builtData) = { + // Filtering nulls on the build side is a workaround. + // 1) For a performance issue in LeftSemi and LeftAnti joins + // https://github.com/rapidsai/cudf/issues/7300 + // 2) As a work around to Struct joins with nullable children + // see https://github.com/NVIDIA/spark-rapids/issues/2126 for more info + val builtAnyNullable = (compareNullsEqual || joinType == LeftSemi || joinType == LeftAnti) && + buildKeys.exists(_.nullable) + + val cb = if (builtAnyNullable) { + GpuHashJoin.filterNulls(builtBatch, boundBuildKeys) + } else { + GpuColumnVector.incRefCounts(builtBatch) } - } - val streamedTable = withResource(combined) { cb => - GpuColumnVector.from(cb) - } - val joined = - withResource(new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)) { _ => - // `doJoinLeftRight` closes the right table if the last argument (`closeRightTable`) - // is true, but never closes the left table. - buildSide match { - case GpuBuildLeft => - // tell `doJoinLeftRight` it is ok to close the `streamedTable`, this can help - // in order to close temporary/intermediary data after a filter in some scenarios. - doJoinLeftRight(builtTable, streamedTable, true) - case GpuBuildRight => - // tell `doJoinLeftRight` to not close `builtTable`, as it is owned by our caller, - // here we close the left table as that one is never closed by `doJoinLeftRight`. - withResource(streamedTable) { _ => - doJoinLeftRight(streamedTable, builtTable, false) - } + withResource(cb) { cb => + closeOnExcept(GpuProjectExec.project(cb, boundBuildKeys)) { builtKeys => + (builtKeys, GpuProjectExec.project(cb, boundBuildData)) } } - - numJoinOutputRows += joined.numRows() - - val tmp = if (boundCondition.isDefined) { - GpuFilter(joined, boundCondition.get, numOutputRows, numOutputBatches, filterTime) - } else { - numOutputRows += joined.numRows() - numOutputBatches += 1 - joined } - if (tmp.numRows() == 0) { - // Not sure if there is a better way to work around this - numOutputBatches.set(numOutputBatches.value - 1) - tmp.close() - None - } else { - Some(tmp) - } - } - // This is a work around added in response to https://github.com/NVIDIA/spark-rapids/issues/1643. - // to deal with slowness arising from many nulls in the build-side of the join. The work around - // should be removed when https://github.com/rapidsai/cudf/issues/7300 is addressed. - private[this] def filterNulls(table: Table, joinKeyIndices: Range, closeTable: Boolean): Table = { - var mask: ai.rapids.cudf.ColumnVector = null - try { - joinKeyIndices.indices.foreach { c => - mask = withResource(table.getColumn(c).isNotNull) { nn => - if (mask == null) { - nn.incRefCount() + // The HashJoinIterator takes ownership of the built keys and built data. It will close + // them when it is done + val joinIterator = + new HashJoinIterator(builtKeys, builtData, stream, boundStreamKeys, boundStreamData, + streamedPlan.output, realTarget, joinType, buildSide, compareNullsEqual, spillCallback, + streamTime, joinTime, totalTime) + val boundFinal = boundFinalProject + if (boundCondition.isDefined) { + val condition = boundCondition.get + joinIterator.flatMap { cb => + joinOutputRows += cb.numRows() + withResource( + GpuFilter(cb, condition, numOutputRows, numOutputBatches, filterTime)) { filtered => + if (filtered.numRows == 0) { + // Not sure if there is a better way to work around this + numOutputBatches.set(numOutputBatches.value - 1) + None } else { - withResource(mask) { _ => - mask.and(nn) - } + Some(GpuProjectExec.project(filtered, boundFinal)) } } } - table.filter(mask) - } finally { - if (mask != null) { - mask.close() - } - - // in some cases, we cannot close the table since it was the build table and is - // reused. - if (closeTable) { - table.close() - } - } - } - - private[this] def doJoinLeftRight( - leftTable: Table, rightTable: Table, closeRightTable: Boolean): ColumnarBatch = { - - def withRightTable(body: Table => Table): Table = { - // Run nullable check on cuDF columns rather than Spark Schema, because right table may be - // filtered in previous (if it is built-side). - val builtAnyNullable = - (joinType == LeftSemi || joinType == LeftAnti) && - joinKeyIndices.exists(rightTable.getColumn(_).hasNulls) - if (builtAnyNullable) { - withResource(filterNulls(rightTable, joinKeyIndices, closeRightTable)) { filtered => - body(filtered) - } - } else { - try { - body(rightTable) - } finally { - if (closeRightTable) { - rightTable.close() - } - } - } - } - - val joinedTable = withRightTable { rt => - joinType match { - case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) - .leftJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) - case RightOuter => rt.onColumns(joinKeyIndices: _*) - .leftJoin(leftTable.onColumns(joinKeyIndices: _*), compareNullsEqual) - case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) - .innerJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) - case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) - .leftSemiJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) - case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) - .leftAntiJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) - case FullOuter => leftTable.onColumns(joinKeyIndices: _*) - .fullJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) - case _ => - throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") - } - } - - try { - val result = joinIndices.zip(output).map { case (joinIndex, outAttr) => - GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType) - }.toArray[ColumnVector] - - new ColumnarBatch(result, joinedTable.getRowCount.toInt) - } finally { - joinedTable.close() - } - } - - /** - * Filter null values for build-side keys, so as to ensure no nullable join column included in - * built table if compareNullsEqual is true. - */ - protected def filterBuiltNullsIfNecessary(table: Table): Table = closeOnExcept(table) { t => - if (compareNullsEqual && gpuBuildKeys.exists(_.nullable)) { - filterNulls(t, joinKeyIndices, closeTable = true) } else { - t - } - } - - /** - * Given sequence of GPU expressions, detect whether there exists any StructType expressions - * who contains nullable child columns. - * Since cuDF can not match nullable children as Spark during join, we detect them before join - * to apply some walking around strategies. For some details, please refer the issue: - * https://github.com/NVIDIA/spark-rapids/issues/2126. - */ - private[this] def anyNullableStructChild(expressions: Seq[GpuExpression]): Boolean = { - def anyNullableChild(struct: StructType): Boolean = struct.fields.exists { field => - if (field.nullable) { - true - } else field.dataType match { - case structType: StructType => anyNullableChild(structType) - case _ => false + joinIterator.map { cb => + withResource(cb) { cb => + joinOutputRows += cb.numRows() + numOutputRows += cb.numRows() + numOutputBatches += 1 + GpuProjectExec.project(cb, boundFinal) + } } } - - expressions.exists { - case expression if expression.dataType.isInstanceOf[StructType] => - anyNullableChild(expression.dataType.asInstanceOf[StructType]) - case _ => false - } } }