Skip to content

Commit

Permalink
Add FullOuter support to GpuShuffledSymmetricHashJoinExec (#10272)
Browse files Browse the repository at this point in the history
* Add FullOuter support to GpuShuffledSymmetricHashJoinExec

Signed-off-by: Jason Lowe <[email protected]>

* review comments

---------

Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Jan 29, 2024
1 parent de59e2e commit 1fb6361
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 60 deletions.
13 changes: 9 additions & 4 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,13 @@ def do_join(spark):
"spark.rapids.sql.input." + scan_name: False})

@ignore_order(local=True)
@pytest.mark.parametrize("join_type", ["Inner", "FullOuter"], ids=idfn)
@pytest.mark.parametrize("is_left_host_shuffle", [False, True], ids=idfn)
@pytest.mark.parametrize("is_right_host_shuffle", [False, True], ids=idfn)
@pytest.mark.parametrize("is_left_smaller", [False, True], ids=idfn)
@pytest.mark.parametrize("batch_size", ["1024", "1g"], ids=idfn)
def test_new_inner_join(is_left_host_shuffle, is_right_host_shuffle, is_left_smaller, batch_size):
def test_new_symmetric_join(join_type, is_left_host_shuffle, is_right_host_shuffle,
is_left_smaller, batch_size):
join_conf = {
"spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true",
"spark.sql.autoBroadcastJoinThreshold": "1",
Expand All @@ -1183,14 +1185,17 @@ def do_join(spark):
left_df = left_df.groupBy("key1", "key2").max("ints", "floats")
if not is_right_host_shuffle:
right_df = right_df.groupBy("key1", "key2").max("doubles", "shorts")
return left_df.join(right_df, ["key1", "key2"], "inner")
return left_df.join(right_df, ["key1", "key2"], join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=join_conf)

@ignore_order(local=True)
@pytest.mark.parametrize("join_type", ["Inner", "FullOuter"], ids=idfn)
@pytest.mark.parametrize("is_left_smaller", [False, True], ids=idfn)
@pytest.mark.parametrize("is_ast_supported", [False, True], ids=idfn)
@pytest.mark.parametrize("batch_size", ["1024", "1g"], ids=idfn)
def test_new_inner_join_conditional(is_ast_supported, is_left_smaller, batch_size):
def test_new_symmetric_join_conditional(join_type, is_ast_supported, is_left_smaller, batch_size):
if join_type == "FullOuter" and not is_ast_supported:
pytest.skip("Full outer joins do not support a non-AST condition")
join_conf = {
"spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true",
"spark.sql.autoBroadcastJoinThreshold": "1",
Expand All @@ -1213,5 +1218,5 @@ def do_join(spark):
else:
# AST does not support logarithm yet
cond.append(left_df.ints >= f.log(right_df.ints))
return left_df.join(right_df, cond, "inner")
return left_df.join(right_df, cond, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=join_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, JoinType, LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
Expand Down Expand Up @@ -70,8 +70,9 @@ class GpuShuffledHashJoinMeta(
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val joinExec = join.joinType match {
case Inner if conf.useShuffledSymmetricHashJoin =>
case Inner | FullOuter if conf.useShuffledSymmetricHashJoin =>
GpuShuffledSymmetricHashJoinExec(
join.joinType,
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
joinCondition,
Expand Down
Loading

0 comments on commit 1fb6361

Please sign in to comment.