From 244dcb036529ffdba75517c40995e3efd4cffedb Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Mon, 3 May 2021 19:16:25 +0800 Subject: [PATCH] Support GpuHashJoin on Structs (#2173) Signed-off-by: sperlingxx --- docs/supported_ops.md | 16 +-- .../src/main/python/join_test.py | 133 +++++++++++++----- .../spark300/GpuBroadcastHashJoinExec.scala | 2 +- .../rapids/shims/spark300/Spark300Shims.scala | 9 +- .../spark301/GpuBroadcastHashJoinExec.scala | 2 +- .../rapids/shims/spark301/Spark301Shims.scala | 9 +- .../spark301db/GpuBroadcastHashJoinExec.scala | 2 +- .../spark311/GpuBroadcastHashJoinExec.scala | 2 +- .../rapids/shims/spark311/Spark311Shims.scala | 9 +- .../nvidia/spark/rapids/GpuOverrides.scala | 3 +- .../rapids/GpuShuffledHashJoinBase.scala | 2 +- .../sql/rapids/execution/GpuHashJoin.scala | 90 +++++++++--- 12 files changed, 207 insertions(+), 72 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 5896209edba..a51a4b117b0 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -561,9 +561,9 @@ Accelerator supports are described below. S NS NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -607,9 +607,9 @@ Accelerator supports are described below. S NS NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -676,9 +676,9 @@ Accelerator supports are described below. S NS NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -699,9 +699,9 @@ Accelerator supports are described below. S NS NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS -PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index e28f69d4b84..fcc960bc0b9 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -33,13 +33,30 @@ pytest.param(FloatGen(nullable=False), marks=[incompat]), pytest.param(DoubleGen(nullable=False), marks=[incompat])] +basic_struct_gen = StructGen([ + ['child' + str(ind), sub_gen] + for ind, sub_gen in enumerate([StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), + BooleanGen(), DateGen(), TimestampGen(), null_gen, decimal_gen_default])], + nullable=True) + +basic_struct_gen_with_no_null_child = StructGen([ + ['child' + str(ind), sub_gen] + for ind, sub_gen in enumerate([StringGen(nullable=False), ByteGen(nullable=False), + ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False), + BooleanGen(nullable=False), DateGen(nullable=False), TimestampGen(nullable=False)])], + nullable=True) + +basic_struct_gen_with_floats = StructGen([['child0', FloatGen()], ['child1', DoubleGen()]], nullable=False) + +struct_gens = [basic_struct_gen, basic_struct_gen_with_no_null_child] + double_gen = [pytest.param(DoubleGen(), marks=[incompat])] _sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1', 'spark.sql.join.preferSortMergeJoin': 'True', 'spark.sql.shuffle.partitions': '2', - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true' - } + 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true', + } # For spark to insert a shuffled hash join it has to be enabled with # "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to @@ -121,16 +138,6 @@ def do_join(spark): return left.join(right, left.key == right.r_key, join_type) assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) -@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull') -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn) -def test_sortmerge_join_struct_as_key(data_gen, join_type): - def do_join(spark): - left, right = create_df(spark, data_gen, 500, 500) - return left.join(right, left.a == right.r_a, join_type) - assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf) - # For spark to insert a shuffled hash join it has to be enabled with # "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to # be larger than a broadcast hash join would want @@ -173,26 +180,6 @@ def do_join(spark): return left.join(broadcast(right), left.key == right.r_key, join_type) assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) -@allow_non_gpu('BroadcastHashJoinExec', 'BroadcastExchangeExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero') -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_broadcast_join_right_array_as_key(data_gen, join_type): - def do_join(spark): - left, right = create_df(spark, data_gen, 500, 50) - return left.join(broadcast(right), left.a == right.r_a, join_type) - assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=allow_negative_scale_of_decimal_conf) - -@allow_non_gpu('BroadcastHashJoinExec', 'BroadcastExchangeExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull') -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_broadcast_join_right_struct_as_key(data_gen, join_type): - def do_join(spark): - left, right = create_df(spark, data_gen, 500, 50) - return left.join(broadcast(right), left.a == right.r_a, join_type) - assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=allow_negative_scale_of_decimal_conf) - @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) # Not all join types can be translated to a broadcast join, but this tests them to be sure we @@ -376,3 +363,85 @@ def do_join(spark): # Even though Spark does not know the size of an RDD input so it will not do a broadcast join unless # we tell it to, this is just to be safe assert_gpu_and_cpu_are_equal_collect(do_join, {'spark.sql.autoBroadcastJoinThreshold': '1'}) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_sortmerge_join_struct_as_key(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(right, left.a == right.r_a, join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_sortmerge_join_struct_mixed_key(data_gen, join_type): + def do_join(spark): + left = two_col_df(spark, data_gen, int_gen, length=500) + right = two_col_df(spark, data_gen, int_gen, length=500) + return left.join(right, (left.a == right.a) & (left.b == right.b), join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_sortmerge_join_struct_mixed_key_with_null_filter(data_gen, join_type): + def do_join(spark): + left = two_col_df(spark, data_gen, int_gen, length=500) + right = two_col_df(spark, data_gen, int_gen, length=500) + return left.join(right, (left.a == right.a) & (left.b == right.b), join_type) + # Disable constraintPropagation to test null filter on built table with nullable structures. + conf = {'spark.sql.constraintPropagation.enabled': 'false', **_sortmerge_join_conf} + assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_join_right_struct_as_key(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(broadcast(right), left.a == right.r_a, join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_join_right_struct_mixed_key(data_gen, join_type): + def do_join(spark): + left = two_col_df(spark, data_gen, int_gen, length=500) + right = two_col_df(spark, data_gen, int_gen, length=250) + return left.join(broadcast(right), (left.a == right.a) & (left.b == right.b), join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2140') +@pytest.mark.parametrize('data_gen', [basic_struct_gen_with_floats], ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_sortmerge_join_struct_with_floats_key(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(right, left.a == right.r_a, join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) + +@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn) +@pytest.mark.parametrize('join_type', ['FullOuter'], ids=idfn) +def test_sortmerge_join_struct_as_key_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + return left.join(right, left.a == right.r_a, join_type) + assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index d58584aa5a7..7af0a3c3d34 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -151,7 +151,7 @@ case class GpuBroadcastHashJoinExec( GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) withResource(combined) { combined => - GpuColumnVector.from(combined) + filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) } } diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 142ef6d95dd..c2f7711e486 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -216,17 +216,20 @@ class Spark300Shims extends SparkShims { GpuOverrides.exec[SortMergeJoinExec]( "Sort merge join, replacing with shuffled hash join", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), GpuOverrides.exec[BroadcastHashJoinExec]( "Implementation of join using broadcast data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), GpuOverrides.exec[ShuffledHashJoinExec]( "Implementation of join using hashed shuffled data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap } diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index 4d0b461f79a..881f12c8d88 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -149,7 +149,7 @@ case class GpuBroadcastHashJoinExec( GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) withResource(combined) { combined => - GpuColumnVector.from(combined) + filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) } } diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala index 1943f988724..dce480a1456 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala @@ -46,17 +46,20 @@ class Spark301Shims extends Spark300Shims { GpuOverrides.exec[SortMergeJoinExec]( "Sort merge join, replacing with shuffled hash join", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), GpuOverrides.exec[BroadcastHashJoinExec]( "Implementation of join using broadcast data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), GpuOverrides.exec[ShuffledHashJoinExec]( "Implementation of join using hashed shuffled data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)) } diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala index 5eb8e571596..875f91eed4c 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala @@ -148,7 +148,7 @@ case class GpuBroadcastHashJoinExec( GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) withResource(combined) { combined => - GpuColumnVector.from(combined) + filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) } } diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala index caa4070c2ee..f63b70408bc 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuBroadcastHashJoinExec.scala @@ -153,7 +153,7 @@ case class GpuBroadcastHashJoinExec( GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) withResource(combined) { combined => - GpuColumnVector.from(combined) + filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) } } diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala index 87ab7f4b3c8..56dba71750a 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala @@ -294,17 +294,20 @@ class Spark311Shims extends Spark301Shims { GpuOverrides.exec[SortMergeJoinExec]( "Sort merge join, replacing with shuffled hash join", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), GpuOverrides.exec[BroadcastHashJoinExec]( "Implementation of join using broadcast data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), GpuOverrides.exec[ShuffledHashJoinExec]( "Implementation of join using hashed shuffled data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + ), TypeSig.all), (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a4709360313..4caa896185a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2842,7 +2842,8 @@ object GpuOverrides { exec[BroadcastExchangeExec]( "The backend for broadcast exchange of data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all), + TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL) + , TypeSig.all), (exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)), exec[BroadcastNestedLoopJoinExec]( "Implementation of join using brute force", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala index 6a9ab2410de..e7ab29acf1a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinBase.scala @@ -84,7 +84,7 @@ abstract class GpuShuffledHashJoinBase( combinedSize = GpuColumnVector.extractColumns(combined) .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(combined) + filterBuiltNullsIfNecessary(GpuColumnVector.from(combined)) } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index d7f17532099..e18b003a35f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -63,19 +63,30 @@ object GpuHashJoin { rightKeys: Seq[Expression], condition: Option[Expression]): Unit = { val keyDataTypes = (leftKeys ++ rightKeys).map(_.dataType) - if (keyDataTypes.exists(dtype => - dtype.isInstanceOf[ArrayType] || dtype.isInstanceOf[StructType] - || dtype.isInstanceOf[MapType])) { - meta.willNotWorkOnGpu("Nested types in join keys are not supported") + if (keyDataTypes.exists(dType => + dType.isInstanceOf[ArrayType] || dType.isInstanceOf[MapType])) { + meta.willNotWorkOnGpu("ArrayType or MapType in join keys are not supported") + } + + def unSupportNonEqualCondition(): Unit = if (condition.isDefined) { + meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") + } + def unSupportStructKeys(): Unit = if (keyDataTypes.exists(_.isInstanceOf[StructType])) { + meta.willNotWorkOnGpu(s"$joinType joins currently do not support with struct keys") } JoinTypeChecks.tagForGpu(joinType, meta) joinType match { case _: InnerLike => - case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti => - if (condition.isDefined) { - meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") - } - case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") + case RightOuter | LeftOuter | LeftSemi | LeftAnti => + unSupportNonEqualCondition() + case FullOuter => + unSupportNonEqualCondition() + // FullOuter join cannot support with struct keys as two issues below + // * https://github.com/NVIDIA/spark-rapids/issues/2126 + // * https://github.com/rapidsai/cudf/issues/7947 + unSupportStructKeys() + case _ => + meta.willNotWorkOnGpu(s"$joinType currently is not supported") } } @@ -156,6 +167,13 @@ trait GpuHashJoin extends GpuExec { } } + // For join types other than FullOuter, we simply set compareNullsEqual as true to adapt + // struct keys with nullable children. Non-nested keys can also be correctly processed with + // compareNullsEqual = true, because we filter all null records from build table before join. + // For some details, please refer the issue: https://github.com/NVIDIA/spark-rapids/issues/2126 + protected lazy val compareNullsEqual: Boolean = (joinType != FullOuter) && + anyNullableStructChild(gpuBuildKeys) + /** * Place the columns in left and the columns in right into a single ColumnarBatch */ @@ -342,9 +360,11 @@ trait GpuHashJoin extends GpuExec { leftTable: Table, rightTable: Table, closeRightTable: Boolean): ColumnarBatch = { def withRightTable(body: Table => Table): Table = { + // Run nullable check on cuDF columns rather than Spark Schema, because right table may be + // filtered in previous (if it is built-side). val builtAnyNullable = - (joinType == LeftSemi || joinType == LeftAnti) && gpuBuildKeys.exists(_.nullable) - + (joinType == LeftSemi || joinType == LeftAnti) && + joinKeyIndices.exists(rightTable.getColumn(_).hasNulls) if (builtAnyNullable) { withResource(filterNulls(rightTable, joinKeyIndices, closeRightTable)) { filtered => body(filtered) @@ -363,17 +383,17 @@ trait GpuHashJoin extends GpuExec { val joinedTable = withRightTable { rt => joinType match { case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) - .leftJoin(rt.onColumns(joinKeyIndices: _*), false) + .leftJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) case RightOuter => rt.onColumns(joinKeyIndices: _*) - .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) + .leftJoin(leftTable.onColumns(joinKeyIndices: _*), compareNullsEqual) case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) - .innerJoin(rt.onColumns(joinKeyIndices: _*), false) + .innerJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) - .leftSemiJoin(rt.onColumns(joinKeyIndices: _*), false) + .leftSemiJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) - .leftAntiJoin(rt.onColumns(joinKeyIndices: _*), false) + .leftAntiJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) case FullOuter => leftTable.onColumns(joinKeyIndices: _*) - .fullJoin(rt.onColumns(joinKeyIndices: _*), false) + .fullJoin(rt.onColumns(joinKeyIndices: _*), compareNullsEqual) case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + s" supported") @@ -390,4 +410,40 @@ trait GpuHashJoin extends GpuExec { joinedTable.close() } } + + /** + * Filter null values for build-side keys, so as to ensure no nullable join column included in + * built table if compareNullsEqual is true. + */ + protected def filterBuiltNullsIfNecessary(table: Table): Table = closeOnExcept(table) { t => + if (compareNullsEqual && gpuBuildKeys.exists(_.nullable)) { + filterNulls(t, joinKeyIndices, closeTable = true) + } else { + t + } + } + + /** + * Given sequence of GPU expressions, detect whether there exists any StructType expressions + * who contains nullable child columns. + * Since cuDF can not match nullable children as Spark during join, we detect them before join + * to apply some walking around strategies. For some details, please refer the issue: + * https://github.com/NVIDIA/spark-rapids/issues/2126. + */ + private[this] def anyNullableStructChild(expressions: Seq[GpuExpression]): Boolean = { + def anyNullableChild(struct: StructType): Boolean = struct.fields.exists { field => + if (field.nullable) { + true + } else field.dataType match { + case structType: StructType => anyNullableChild(structType) + case _ => false + } + } + + expressions.exists { + case expression if expression.dataType.isInstanceOf[StructType] => + anyNullableChild(expression.dataType.asInstanceOf[StructType]) + case _ => false + } + } }