Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow batching the output of a join #2310

Merged
merged 10 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
def _assert_equal(cpu, gpu, float_check, path):
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {}".format(path)
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {}".format(path)
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
Expand Down
14 changes: 10 additions & 4 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_array(data_gen, join_type):
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
def test_sortmerge_join_array(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
conf.update(_sortmerge_join_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero')
@ignore_order(local=True)
Expand All @@ -112,11 +115,14 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_struct(data_gen, join_type):
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
def test_sortmerge_join_struct(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
conf.update(_sortmerge_join_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -100,7 +100,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -141,28 +141,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -98,7 +98,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -139,28 +139,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -97,7 +97,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -138,28 +138,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ class GpuShuffledHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val Seq(leftChild, rightChild) = childPlans.map(_.convertIfNeeded())
val Seq(left, right) = childPlans.map(_.convertIfNeeded)
GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
GpuJoinUtils.getGpuBuildSide(join.buildSide),
condition.map(_.convertToGpu()),
leftChild,
rightChild)
left,
right)
}
}

Expand All @@ -75,12 +75,12 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._
Expand All @@ -102,7 +102,7 @@ case class GpuBroadcastHashJoinExec(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -143,28 +143,20 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
rdd.mapPartitions { it =>
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class GpuShuffledHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val Seq(left, right) = childPlans.map(_.convertIfNeeded)
GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
Expand All @@ -76,7 +76,7 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static synchronized void debug(String name, ColumnarBatch cb) {
* @param name the name of the column to print out.
* @param col the column to print out.
*/
public static synchronized void debug(String name, ai.rapids.cudf.ColumnVector col) {
public static synchronized void debug(String name, ai.rapids.cudf.ColumnView col) {
try (HostColumnVector hostCol = col.copyToHost()) {
debug(name, hostCol);
}
Expand Down Expand Up @@ -671,7 +671,8 @@ static boolean typeConversionAllowed(Table table, DataType[] colTypes, int start
*/
static boolean typeConversionAllowed(Table table, DataType[] colTypes) {
final int numColumns = table.getNumberOfColumns();
assert numColumns == colTypes.length: "The number of columns and the number of types don't match";
assert numColumns == colTypes.length: "The number of columns and the number of types don't " +
"match " + table + " " + Arrays.toString(colTypes);
boolean ret = true;
for (int colIndex = 0; colIndex < numColumns; colIndex++) {
ret = ret && typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ case class GpuBoundReference(ordinal: Int, dataType: DataType, nullable: Boolean
override def toString: String = s"input[$ordinal, ${dataType.simpleString}, $nullable]"

override def columnarEval(batch: ColumnarBatch): Any = {
batch.column(ordinal).asInstanceOf[GpuColumnVector].incRefCount()
batch.column(ordinal) match {
case fb: GpuColumnVectorFromBuffer =>
// When doing a project we might re-order columns or do other things that make it
// so this no longer looks like the original contiguous buffer it came from
// so to avoid it appearing to down stream processing as the same buffer we change
// the type here.
new GpuColumnVector(fb.dataType(), fb.getBase.incRefCount())
case cv: GpuColumnVector => cv.incRefCount()
}
}
}
Loading