Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GpuHashJoin on Structs #2173

Merged
merged 9 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 90 additions & 32 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,30 @@
pytest.param(FloatGen(nullable=False), marks=[incompat]),
pytest.param(DoubleGen(nullable=False), marks=[incompat])]

basic_struct_gen = StructGen([
['child' + str(ind), sub_gen]
for ind, sub_gen in enumerate([StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
BooleanGen(), DateGen(), TimestampGen(), null_gen, decimal_gen_default])],
nullable=False)

basic_struct_gen_with_no_null_child = StructGen([
['child' + str(ind), sub_gen]
for ind, sub_gen in enumerate([StringGen(nullable=False), ByteGen(nullable=False),
ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False),
BooleanGen(nullable=False), DateGen(nullable=False), TimestampGen(nullable=False)])],
nullable=False)

basic_struct_gen_with_floats = StructGen([['child0', FloatGen()], ['child1', DoubleGen()]], nullable=False)

struct_gens = [basic_struct_gen, basic_struct_gen_with_no_null_child]

double_gen = [pytest.param(DoubleGen(), marks=[incompat])]

_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
'spark.sql.join.preferSortMergeJoin': 'True',
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.stableSort.enabled': 'true'
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}

# For spark to insert a shuffled hash join it has to be enabled with
Expand Down Expand Up @@ -115,16 +133,6 @@ def do_join(spark):
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_struct_as_key(data_gen, join_type):
revans2 marked this conversation as resolved.
Show resolved Hide resolved
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

# For spark to insert a shuffled hash join it has to be enabled with
# "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to
# be larger than a broadcast hash join would want
Expand Down Expand Up @@ -177,27 +185,6 @@ def do_join(spark):
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=allow_negative_scale_of_decimal_conf)

@allow_non_gpu('BroadcastHashJoinExec', 'BroadcastExchangeExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_right_struct_as_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 50)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=allow_negative_scale_of_decimal_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
# Not all join types can be translated to a broadcast join, but this tests them to be sure we
# can handle what spark is doing
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_broadcast_join_right_table_struct(data_gen, join_type):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(broadcast(right), left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down Expand Up @@ -370,3 +357,74 @@ def do_join(spark):
# Even though Spark does not know the size of an RDD input so it will not do a broadcast join unless
# we tell it to, this is just to be safe
assert_gpu_and_cpu_are_equal_collect(do_join, {'spark.sql.autoBroadcastJoinThreshold': '1'})

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross'], ids=idfn)
def test_sortmerge_join_struct_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross'], ids=idfn)
def test_sortmerge_join_struct_mixed_key(data_gen, join_type):
def do_join(spark):
left = two_col_df(spark, data_gen, int_gen, length=500)
right = two_col_df(spark, data_gen, int_gen, length=500)
return left.join(right, (left.a == right.a) & (left.b == right.b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross'], ids=idfn)
def test_sortmerge_join_struct_mixed_key_with_null_filtering(data_gen, join_type):
def do_join(spark):
left = two_col_df(spark, data_gen, int_gen, length=500)
right = two_col_df(spark, data_gen, int_gen, length=500)
return left.join(right, (left.a == right.a) & (left.b == right.b), join_type)
conf = {'spark.sql.constraintPropagation.enabled': 'false', **_sortmerge_join_conf}
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'Cross'], ids=idfn)
def test_broadcast_join_struct_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', struct_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'Cross'], ids=idfn)
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
def test_broadcast_join_struct_mixed_key(data_gen, join_type):
def do_join(spark):
left = two_col_df(spark, data_gen, int_gen, length=500)
right = two_col_df(spark, data_gen, int_gen, length=250)
return left.join(broadcast(right), (left.a == right.a) & (left.b == right.b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2140')
@pytest.mark.parametrize('data_gen', [basic_struct_gen_with_floats], ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Left', 'Right', 'Cross'], ids=idfn)
def test_sortmerge_join_struct_with_floats_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ case class GpuBroadcastHashJoinExec(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,20 @@ class Spark300Shims extends SparkShims {
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ case class GpuBroadcastHashJoinExec(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,20 @@ class Spark301Shims extends Spark300Shims {
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class GpuBroadcastHashJoinExec(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
withResource(combined) { combined =>
GpuColumnVector.from(combined)
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,20 @@ class Spark311Shims extends Spark301Shims {
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL
), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,8 @@ object GpuOverrides {
exec[BroadcastExchangeExec](
"The backend for broadcast exchange of data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL)
, TypeSig.all),
(exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ abstract class GpuShuffledHashJoinBase(
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
filterBuiltNullsIfNecessary(GpuColumnVector.from(combined))
}
}
}
Expand Down
Loading