Skip to content

Commit

Permalink
Fix hash-aggregate tests failing in ANSI mode
Browse files Browse the repository at this point in the history
Fixes NVIDIA#11018.

This commit fixes the hash aggregate tests that fail with ANSI enabled.

These tests fail most visibly on Spark 4.0, where ANSI mode is enabled by default.

Signed-off-by: MithunR <[email protected]>
  • Loading branch information
mythrocks committed Jul 16, 2024
1 parent 34e6bc8 commit ee605b8
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
pytestmark = pytest.mark.nightly_resource_consuming_test

_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}
'spark.rapids.sql.castStringToFloat.enabled': 'true'}

_float_smallbatch_conf = copy_and_update(_float_conf,
{'spark.rapids.sql.batchSizeBytes' : '250'})
Expand Down Expand Up @@ -348,6 +347,7 @@ def test_hash_reduction_sum_count_action(data_gen):

# Make sure that we can do computation in the group by columns
@ignore_order
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_computation_in_grpby_columns():
conf = {'spark.rapids.sql.batchSizeBytes' : '250'}
data_gen = [
Expand Down Expand Up @@ -418,6 +418,7 @@ def test_hash_grpby_avg(data_gen, conf):
@approximate_float
@ignore_order
@incompat
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
Expand All @@ -444,6 +445,7 @@ def test_intersectAll(data_gen):
@approximate_float
@ignore_order
@incompat
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', _init_list_with_decimalbig, ids=idfn)
def test_exceptAll(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -587,13 +589,15 @@ def test_hash_pivot_groupby_duplicates_fallback(data_gen):

# very simple test for just a count on decimals 128 values until we can support more with them
@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
def test_decimal128_count_reduction(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).selectExpr('count(a)'))

# very simple test for just a count on decimals 128 values until we can support more with them
@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
def test_decimal128_count_group_by(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -1133,6 +1137,7 @@ def test_hash_query_max_with_multiple_distincts(data_gen, conf):
conf=local_conf)

@ignore_order
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', _init_list, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_count_with_filter(data_gen, conf):
Expand All @@ -1159,6 +1164,7 @@ def test_hash_multiple_filters(data_gen, conf):

@approximate_float
@ignore_order
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys,
_grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn)
def test_hash_agg_with_nan_keys(data_gen):
Expand All @@ -1179,6 +1185,7 @@ def test_hash_agg_with_nan_keys(data_gen):
local_conf)

@ignore_order
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', [_grpkey_structs_with_non_nested_children,
_grpkey_nested_structs], ids=idfn)
def test_hash_agg_with_struct_keys(data_gen):
Expand Down Expand Up @@ -1224,6 +1231,7 @@ def test_hash_agg_with_struct_of_array_fallback(data_gen):

@approximate_float
@ignore_order
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', [ _grpkey_floats_with_nulls_and_nans ], ids=idfn)
def test_count_distinct_with_nan_floats(data_gen):
assert_gpu_and_cpu_are_equal_sql(
Expand Down Expand Up @@ -1256,6 +1264,7 @@ def test_first_last_reductions_nested_types(data_gen):
'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)'))

@pytest.mark.parametrize('data_gen', _all_basic_gens_with_all_nans_cases, ids=idfn)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@allow_non_gpu(*non_utc_allow)
def test_generic_reductions(data_gen):
local_conf = copy_and_update(_float_conf, {'spark.sql.legacy.allowParameterlessCount': 'true'})
Expand All @@ -1274,6 +1283,7 @@ def test_generic_reductions(data_gen):
conf=local_conf)

@pytest.mark.parametrize('data_gen', all_gen + _nested_gens, ids=idfn)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@allow_non_gpu(*non_utc_allow)
def test_count(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -1285,20 +1295,23 @@ def test_count(data_gen):
'count(1)'),
conf = {'spark.sql.legacy.allowParameterlessCount': 'true'})

@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_distinct_count_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'count(DISTINCT a)'))

@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn)
def test_distinct_float_count_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'count(DISTINCT a)'))

@approximate_float
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', numeric_gens + [decimal_gen_64bit, decimal_gen_128bit], ids=idfn)
def test_arithmetic_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -1373,6 +1386,7 @@ def test_sorted_groupby_first_last(data_gen):
# Spark has a sorting bug with decimals, see https://issues.apache.org/jira/browse/SPARK-40129.
# Have pytest do the sorting rather than Spark as a workaround.
@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('count_func', [f.count, f.countDistinct])
@allow_non_gpu(*non_utc_allow)
Expand Down Expand Up @@ -1544,13 +1558,15 @@ def do_it(spark):
assert_gpu_and_cpu_are_equal_collect(do_it)

@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_agg_nested_struct():
def do_it(spark):
df = two_col_df(spark, StringGen('k{1,5}'), StructGen([('aa', StructGen([('aaa', IntegerGen(min_val=0, max_val=4))]))]))
return df.groupBy('a').agg(f.sum(df.b.aa.aaa))
assert_gpu_and_cpu_are_equal_collect(do_it)

@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_agg_nested_array():
def do_it(spark):
df = two_col_df(spark, StringGen('k{1,5}'), ArrayGen(StructGen([('aa', IntegerGen(min_val=0, max_val=4))])))
Expand All @@ -1559,6 +1575,7 @@ def do_it(spark):

# The map here is a child not a top level, because we only support GetMapValue on String to String maps.
@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_agg_nested_map():
def do_it(spark):
df = two_col_df(spark, StringGen('k{1,5}'), ArrayGen(MapGen(StringGen('a{1,5}', nullable=False), StringGen('[ab]{1,5}'))))
Expand Down Expand Up @@ -1599,6 +1616,7 @@ def test_hash_groupby_approx_percentile_byte(aqe_enabled):
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@incompat
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/11198
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled):
conf = {'spark.sql.adaptive.enabled': aqe_enabled}
Expand Down Expand Up @@ -1679,8 +1697,10 @@ def test_hash_groupby_approx_percentile_decimal32():
('v', DecimalGen(6, 2))]),
[0.05, 0.25, 0.5, 0.75, 0.95])


@incompat
@ignore_order(local=True)
@disable_ansi_mode # ANSI mode is tested with test_hash_groupby_approx_percentile_decimal_single_ansi.
def test_hash_groupby_approx_percentile_decimal32_single():
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', RepeatSeqGen(ByteGen(nullable=False), length=2)),
Expand All @@ -1696,6 +1716,7 @@ def test_hash_groupby_approx_percentile_decimal64():
[0.05, 0.25, 0.5, 0.75, 0.95])

@incompat
@disable_ansi_mode # ANSI mode is tested with test_hash_groupby_approx_percentile_decimal_single_ansi.
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_decimal64_single():
compare_percentile_approx(
Expand All @@ -1712,6 +1733,7 @@ def test_hash_groupby_approx_percentile_decimal128():
[0.05, 0.25, 0.5, 0.75, 0.95])

@incompat
@disable_ansi_mode # ANSI mode is tested with test_hash_groupby_approx_percentile_decimal_single_ansi.
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_decimal128_single():
compare_percentile_approx(
Expand Down Expand Up @@ -2063,13 +2085,15 @@ def test_min_max_in_groupby_and_reduction(data_gen):
# Some Spark implementations will optimize this aggregation as a
# complete aggregation (i.e.: only one aggregation node in the plan)
@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
def test_hash_aggregate_complete_with_grouping_expressions():
assert_gpu_and_cpu_are_equal_sql(
lambda spark : spark.range(10).withColumn("id2", f.col("id")),
"hash_agg_complete_table",
"select id, avg(id) from hash_agg_complete_table group by id, id2 + 1")

@ignore_order(local=True)
@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('cast_key_to', ["byte", "short", "int",
"long", "string", "DECIMAL(38,5)"], ids=idfn)
def test_hash_agg_force_pre_sort(cast_key_to):
Expand Down

0 comments on commit ee605b8

Please sign in to comment.