Skip to content

Commit

Permalink
Disable ANSI mode for window function tests.
Browse files Browse the repository at this point in the history
Fixes NVIDIA#11019.

Window function tests fail on Spark 4.0 because of NVIDIA#5114 (and NVIDIA#5120 broadly),
because spark-rapids does not support SUM, COUNT, and certain other aggregations
in ANSI mode.

This commit disables ANSI mode tests for the failing window function tests. These may be
revisited, once error/overflow checking is available for ANSI mode in spark-rapids.

Signed-off-by: MithunR <[email protected]>
  • Loading branch information
mythrocks committed Jun 17, 2024
1 parent 900ae6f commit 3db464b
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def test_float_window_min_max_all_nans(data_gen):
.withColumn("max_b", f.max('a').over(w))
)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
def test_decimal128_count_window(data_gen):
Expand All @@ -177,6 +179,8 @@ def test_decimal128_count_window(data_gen):
' rows between 2 preceding and 10 following) as count_c_asc '
'from window_agg_table')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
def test_decimal128_count_window_no_part(data_gen):
Expand All @@ -189,6 +193,8 @@ def test_decimal128_count_window_no_part(data_gen):
' rows between 2 preceding and 10 following) as count_b_asc '
'from window_agg_table')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
@pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn)
def test_decimal_sum_window(data_gen):
Expand All @@ -201,6 +207,8 @@ def test_decimal_sum_window(data_gen):
' rows between 2 preceding and 10 following) as sum_c_asc '
'from window_agg_table')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
@pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn)
def test_decimal_sum_window_no_part(data_gen):
Expand All @@ -214,6 +222,7 @@ def test_decimal_sum_window_no_part(data_gen):
'from window_agg_table')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
@pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn)
def test_decimal_running_sum_window(data_gen):
Expand All @@ -227,6 +236,8 @@ def test_decimal_running_sum_window(data_gen):
'from window_agg_table',
conf = {'spark.rapids.sql.batchSizeBytes': '100'})


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
@pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn)
def test_decimal_running_sum_window_no_part(data_gen):
Expand Down Expand Up @@ -302,6 +313,7 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen):
'from window_agg_table')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
Expand Down Expand Up @@ -352,6 +364,7 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size):
conf = conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
Expand Down Expand Up @@ -396,6 +409,7 @@ def test_window_aggs_for_rows(data_gen, batch_size):
conf = conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
@pytest.mark.parametrize('data_gen', [
Expand Down Expand Up @@ -482,6 +496,8 @@ def test_window_batched_unbounded(b_gen, batch_size):
validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'],
conf = conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
Expand Down Expand Up @@ -520,6 +536,7 @@ def test_rows_based_running_window_unpartitioned(b_gen, batch_size):
conf = conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Testing multiple batch sizes.
@pytest.mark.parametrize('a_gen', integral_gens + [string_gen, date_gen, timestamp_gen], ids=meta_idfn('data:'))
@allow_non_gpu(*non_utc_allow)
Expand Down Expand Up @@ -694,6 +711,7 @@ def test_window_running_rank(data_gen):
conf = conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this.
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
Expand Down Expand Up @@ -738,6 +756,8 @@ def test_rows_based_running_window_partitioned(b_gen, c_gen, batch_size):
conf = conf)



@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Test different batch sizes.
@pytest.mark.parametrize('part_gen', [int_gen, long_gen], ids=idfn) # Partitioning is not really the focus of the test.
Expand Down Expand Up @@ -805,6 +825,7 @@ def must_test_sum_aggregation(gen):
conf=conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# Test that we can do a running window sum on floats and doubles and decimal. This becomes problematic because we do the agg in parallel
# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations.
# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have
Expand Down Expand Up @@ -836,6 +857,7 @@ def test_window_running_float_decimal_sum(batch_size):
conf = conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@approximate_float
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Test different batch sizes.
Expand Down Expand Up @@ -879,6 +901,7 @@ def window(oby_column):
conf=conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
Expand Down Expand Up @@ -1000,6 +1023,7 @@ def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen):
''')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# lead and lag don't currently work for string columns, so redo the tests, but just for strings
# without lead and lag
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
Expand Down Expand Up @@ -1107,6 +1131,8 @@ def test_window_aggs_lag_ignore_nulls_fallback(a_gen, b_gen, c_gen, d_gen):
FROM window_agg_table
''')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# Test for RANGE queries, with timestamp order-by expressions.
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
Expand Down Expand Up @@ -1155,6 +1181,7 @@ def test_window_aggs_for_ranges_timestamps(data_gen):
conf = {'spark.rapids.sql.castFloatToDecimal.enabled': True})


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
Expand Down Expand Up @@ -1201,6 +1228,7 @@ def test_window_aggregations_for_decimal_and_float_ranges(data_gen):
conf={})


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
Expand Down Expand Up @@ -1306,6 +1334,7 @@ def test_window_aggs_for_rows_collect_list():
conf={'spark.rapids.sql.window.collectList.enabled': True})


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# SortExec does not support array type, so sort the result locally.
@ignore_order(local=True)
# This test is more directed at Databricks and their running window optimization instead of ours
Expand Down Expand Up @@ -1347,6 +1376,8 @@ def test_running_window_function_exec_for_all_aggs():
''',
conf={'spark.rapids.sql.window.collectList.enabled': True})


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
# Test the Databricks WindowExec which combines a WindowExec with a ProjectExec and provides the output
# fields that we need to handle with an extra GpuProjectExec and we need the input expressions to compute
# a window function of another window function case
Expand Down Expand Up @@ -1668,6 +1699,8 @@ def do_it(spark):

assert_gpu_fallback_collect(do_it, 'WindowExec')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
# single-level structs (no nested structs) are now supported by the plugin
@pytest.mark.parametrize('part_gen', [StructGen([["a", long_gen]])], ids=meta_idfn('partBy:'))
Expand Down Expand Up @@ -1731,6 +1764,8 @@ def do_it(spark):

assert_gpu_and_cpu_are_equal_collect(do_it)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order
def test_unbounded_to_unbounded_window():
# This is specifically to test a bug that caused overflow issues when calculating
Expand Down Expand Up @@ -1784,6 +1819,7 @@ def test_window_first_last_nth_ignore_nulls(data_gen):
'FROM window_agg_table')


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@tz_sensitive_test
@allow_non_gpu(*non_supported_tz_allow)
@ignore_order(local=True)
Expand Down Expand Up @@ -1825,6 +1861,7 @@ def test_to_date_with_window_functions():
)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
Expand Down Expand Up @@ -1881,6 +1918,7 @@ def spark_bugs_in_decimal_sorting():
return v < "3.1.4" or v < "3.3.1" or v < "3.2.3" or v < "3.4.0"


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1g'], ids=idfn)
Expand Down Expand Up @@ -1925,6 +1963,7 @@ def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size):
conf=conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
@pytest.mark.parametrize('data_gen', [
Expand Down Expand Up @@ -1964,6 +2003,7 @@ def test_window_aggs_for_batched_finite_row_windows_partitioned(data_gen, batch_
conf=conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
@pytest.mark.parametrize('data_gen', [
Expand Down Expand Up @@ -2003,6 +2043,7 @@ def test_window_aggs_for_batched_finite_row_windows_unpartitioned(data_gen, batc
conf=conf)


@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [_grpkey_int_with_nulls,], ids=idfn)
def test_window_aggs_for_batched_finite_row_windows_fallback(data_gen):
Expand Down

0 comments on commit 3db464b

Please sign in to comment.