Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use libcudf mixed joins for conditional hash joins [databricks] #4477

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 123 additions & 9 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,12 +58,18 @@
nested_3d_struct_gens = StructGen([['child0', nested_2d_struct_gens]], nullable=False)
struct_gens = [basic_struct_gen, basic_struct_gen_with_no_null_child, nested_2d_struct_gens, nested_3d_struct_gens]

double_gen = [pytest.param(DoubleGen(), marks=[incompat])]

basic_nested_gens = single_level_array_gens + map_string_string_gen + [all_basic_struct_gen]

# data types supported by AST expressions
ast_gen = [boolean_gen, byte_gen, short_gen, int_gen, long_gen, timestamp_gen]
# data types supported by AST expressions in joins
join_ast_gen = [
boolean_gen, byte_gen, short_gen, int_gen, long_gen, date_gen, timestamp_gen
]

# data types not supported by AST expressions in joins
join_no_ast_gen = [
pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat]),
string_gen, null_gen, decimal_gen_default, decimal_gen_64bit
]

_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
'spark.sql.join.preferSortMergeJoin': 'True',
Expand Down Expand Up @@ -349,7 +355,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_right_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size):
Expand All @@ -366,7 +372,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen, batch_size):
def do_join(spark):
Expand Down Expand Up @@ -497,15 +503,74 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'Inner', 'Cross'], ids=idfn)
def test_broadcast_join_with_conditionals(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_with_condition_join_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# AST does not support cast or logarithm yet
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > right.r_b), join_type)
conf = allow_negative_scale_of_decimal_conf
assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan', 'Log', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_with_condition_ast_op_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# AST does not support cast or logarithm yet
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type)
conf = allow_negative_scale_of_decimal_conf
exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec'
assert_gpu_fallback_collect(do_join, exec, conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_with_condition_ast_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# AST does not support cast or logarithm yet
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > right.r_b), join_type)
conf = allow_negative_scale_of_decimal_conf
exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec'
assert_gpu_fallback_collect(do_join, exec, conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
def test_broadcast_join_with_condition_post_filter(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -516,12 +581,61 @@ def do_join(spark):
return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), 'Inner')
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter'], ids=idfn)
def test_sortmerge_join_with_condition_ast(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('GreaterThan', 'ShuffleExchangeExec', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn)
def test_sortmerge_join_with_condition_join_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('GreaterThan', 'Log', 'ShuffleExchangeExec', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter'], ids=idfn)
def test_sortmerge_join_with_condition_ast_op_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
# AST does not support cast or logarithm yet
return left.join(right, (left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('GreaterThan', 'ShuffleExchangeExec', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_sortmerge_join_with_condition_ast_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, (left.a == right.r_a) & (left.b > right.r_b), join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)


_mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)),
('b', IntegerGen()), ('c', LongGen())]
_mixed_df2_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)),
('b', StringGen()), ('c', BooleanGen())]


@ignore_order
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'FullOuter', 'Cross'], ids=idfn)
def test_broadcast_join_mixed(join_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ abstract class AbstractGpuJoinIterator(
* Called to setup the next join gatherer instance when the previous instance is done or
* there is no previous instance. Because this is likely to call next or has next on the
* stream side all implementations must track their own opTime metrics.
* @param startNanoTime system nanoseconds timestamp at the top of the iterator loop, useful for
* calculating the time spent producing the next stream batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ class GpuBroadcastHashJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] =
val conditionMeta: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide)

override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] =
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition)
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta)

override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition
override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta

override def tagPlanForGpu(): Unit = {
GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys,
join.condition)
conditionMeta)
val Seq(leftChild, rightChild) = childPlans
val buildSideMeta = buildSide match {
case GpuBuildLeft => leftChild
Expand All @@ -74,6 +74,12 @@ class GpuBroadcastHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val condition = conditionMeta.map(_.convertToGpu())
val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) {
(condition, None)
} else {
(None, condition)
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
// The broadcast part of this must be a BroadcastExchangeExec
val buildSideMeta = buildSide match {
Expand All @@ -86,11 +92,11 @@ class GpuBroadcastHashJoinMeta(
rightKeys.map(_.convertToGpu()),
join.joinType,
buildSide,
None,
joinCondition,
left, right)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
// For inner joins we can apply a post-join condition for any conditions that cannot be
// evaluated directly in a mixed join that leverages a cudf AST expression
filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,42 @@ class GpuShuffledHashJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] =
val conditionMeta: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide)

override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition
override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta

override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] =
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition)
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta)

override def tagPlanForGpu(): Unit = {
GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys,
join.condition)
conditionMeta)
}

override def convertToGpu(): GpuExec = {
val condition = conditionMeta.map(_.convertToGpu())
val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) {
(condition, None)
} else {
(None, condition)
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val joinExec = GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
buildSide,
None,
joinCondition,
left,
right,
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
// For inner joins we can apply a post-join condition for any conditions that cannot be
// evaluated directly in a mixed join that leverages a cudf AST expression
filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class GpuSortMergeJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] = join.condition.map(
val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(
GpuOverrides.wrapExpr(_, conf, Some(this)))
val buildSide: GpuBuildSide = if (GpuHashJoin.canBuildRight(join.joinType)) {
GpuBuildRight
Expand All @@ -41,15 +41,15 @@ class GpuSortMergeJoinMeta(
throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join")
}

override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition
override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta

override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] =
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition)
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta)

override def tagPlanForGpu(): Unit = {
// Use conditions from Hash Join
GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys,
join.condition)
conditionMeta)

if (!conf.enableReplaceSortMergeJoin) {
willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " +
Expand All @@ -74,20 +74,26 @@ class GpuSortMergeJoinMeta(
}

override def convertToGpu(): GpuExec = {
val condition = conditionMeta.map(_.convertToGpu())
val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) {
(condition, None)
} else {
(None, condition)
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val joinExec = GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
buildSide,
None,
joinCondition,
left,
right,
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
// For inner joins we can apply a post-join condition for any conditions that cannot be
// evaluated directly in a mixed join that leverages a cudf AST expression
filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec)
}
}
Loading