Skip to content

Commit

Permalink
Allow batching the output of a join (NVIDIA#2310)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored May 3, 2021
1 parent cf3a68d commit 1d03487
Show file tree
Hide file tree
Showing 17 changed files with 1,193 additions and 397 deletions.
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
def _assert_equal(cpu, gpu, float_check, path):
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {}".format(path)
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {}".format(path)
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
Expand Down
9 changes: 6 additions & 3 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_sortmerge_join_array(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
Expand All @@ -132,11 +132,14 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_struct(data_gen, join_type):
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
def test_sortmerge_join_struct(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
conf.update(_sortmerge_join_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

# For spark to insert a shuffled hash join it has to be enabled with
# "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -100,7 +100,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -141,28 +141,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -98,7 +98,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -139,28 +139,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -97,7 +97,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -138,28 +138,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ class GpuShuffledHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val Seq(leftChild, rightChild) = childPlans.map(_.convertIfNeeded())
val Seq(left, right) = childPlans.map(_.convertIfNeeded)
GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
GpuJoinUtils.getGpuBuildSide(join.buildSide),
condition.map(_.convertToGpu()),
leftChild,
rightChild)
left,
right)
}
}

Expand All @@ -75,12 +75,12 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -102,7 +102,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -143,28 +143,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class GpuShuffledHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val Seq(left, right) = childPlans.map(_.convertIfNeeded)
GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
Expand All @@ -76,7 +76,7 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static synchronized void debug(String name, ColumnarBatch cb) {
* @param name the name of the column to print out.
* @param col the column to print out.
*/
public static synchronized void debug(String name, ai.rapids.cudf.ColumnVector col) {
public static synchronized void debug(String name, ai.rapids.cudf.ColumnView col) {
try (HostColumnVector hostCol = col.copyToHost()) {
debug(name, hostCol);
}
Expand Down Expand Up @@ -671,7 +671,8 @@ static boolean typeConversionAllowed(Table table, DataType[] colTypes, int start
*/
static boolean typeConversionAllowed(Table table, DataType[] colTypes) {
final int numColumns = table.getNumberOfColumns();
assert numColumns == colTypes.length: "The number of columns and the number of types don't match";
assert numColumns == colTypes.length: "The number of columns and the number of types don't " +
"match " + table + " " + Arrays.toString(colTypes);
boolean ret = true;
for (int colIndex = 0; colIndex < numColumns; colIndex++) {
ret = ret && typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ case class GpuBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean
override def toString: String = s"input[$ordinal, ${dataType.simpleString}, $nullable]"

override def columnarEval(batch: ColumnarBatch): Any = {
batch.column(ordinal).asInstanceOf[GpuColumnVector].incRefCount()
batch.column(ordinal) match {
case fb: GpuColumnVectorFromBuffer =>
// When doing a project we might re-order columns or do other things that make it
// so this no longer looks like the original contiguous buffer it came from
// so to avoid it appearing to down stream processing as the same buffer we change
// the type here.
new GpuColumnVector(fb.dataType(), fb.getBase.incRefCount())
case cv: GpuColumnVector => cv.incRefCount()
}
}
}
Loading

0 comments on commit 1d03487

Please sign in to comment.