Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow batching the output of a join #2310

Merged
merged 10 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
def _assert_equal(cpu, gpu, float_check, path):
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {}".format(path)
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {}".format(path)
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
revans2 marked this conversation as resolved.
Show resolved Hide resolved
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
elif (t is pytypes.GeneratorType):
Expand Down
14 changes: 10 additions & 4 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_array(data_gen, join_type):
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
def test_sortmerge_join_array(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
conf.update(_sortmerge_join_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero')
@ignore_order(local=True)
Expand All @@ -112,11 +115,14 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_struct(data_gen, join_type):
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
def test_sortmerge_join_struct(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
conf.update(_sortmerge_join_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,20 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

private [this] lazy val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -141,28 +143,17 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
lazy val builtBatch = broadcastRelation.value.batch
jlowe marked this conversation as resolved.
Show resolved Hide resolved

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
doJoin(builtBatch, it, targetSize, spillCallback,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,20 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

private [this] lazy val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -139,28 +141,17 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
lazy val builtBatch = broadcastRelation.value.batch

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,20 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

private [this] lazy val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -138,28 +140,17 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
lazy val builtBatch = broadcastRelation.value.batch

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ class GpuShuffledHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val Seq(leftChild, rightChild) = childPlans.map(_.convertIfNeeded())
val Seq(left, right) = childPlans.map(_.convertIfNeeded)
GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
GpuJoinUtils.getGpuBuildSide(join.buildSide),
condition.map(_.convertToGpu()),
leftChild,
rightChild)
left,
right)
}
}

Expand All @@ -75,12 +75,12 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,20 @@ case class GpuBroadcastHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

private [this] lazy val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -143,28 +145,17 @@ case class GpuBroadcastHashJoinExec(
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

val broadcastRelation = broadcastExchange
.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
}
lazy val builtBatch = broadcastRelation.value.batch

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions(it =>
doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows,
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
filterTime, totalTime))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class GpuShuffledHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val Seq(left, right) = childPlans.map(_.convertIfNeeded)
GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
Expand All @@ -76,7 +76,7 @@ case class GpuShuffledHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
condition: Option[Expression],
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static synchronized void debug(String name, ColumnarBatch cb) {
* @param name the name of the column to print out.
* @param col the column to print out.
*/
public static synchronized void debug(String name, ai.rapids.cudf.ColumnVector col) {
public static synchronized void debug(String name, ai.rapids.cudf.ColumnView col) {
try (HostColumnVector hostCol = col.copyToHost()) {
debug(name, hostCol);
}
Expand Down
Loading