From b290127763f7f96bb88aa2be1ac7cf10e068e380 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 21 May 2021 13:11:48 -0500 Subject: [PATCH 1/3] Window tests with smaller batches Signed-off-by: Robert (Bobby) Evans --- integration_tests/src/main/python/data_gen.py | 5 ++- .../src/main/python/window_function_test.py | 41 ++++++++++--------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 19b1dcb6077..04475ba960e 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -515,7 +515,10 @@ def __init__(self, start=None, end=None, nullable=True): if start is None: # spark supports times starting at # "0001-01-01 00:00:00.000000" - start = datetime(1, 1, 1, tzinfo=timezone.utc) + # but if has issues if you get really close to that because it tries to do things + # in a differnt fromat which causes roundoff, so we have to add a few days, + # just to be sure + start = datetime(1, 1, 3, tzinfo=timezone.utc) elif not isinstance(start, datetime): raise RuntimeError('Unsupported type passed in for start {}'.format(start)) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index c4b63d2102d..7cf8582f94c 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -153,13 +153,17 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen): @ignore_order +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('data_gen', [ _grpkey_byte_with_nulls, _grpkey_short_with_nulls, _grpkey_int_with_nulls, _grpkey_long_with_nulls ], ids=idfn) -def test_window_aggs_for_range_numeric(data_gen): +def test_window_aggs_for_range_numeric(data_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.window.range.byte.enabled': True, + 'spark.rapids.sql.window.range.short.enabled': True} assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), "window_agg_table", @@ -192,11 +196,10 @@ def test_window_aggs_for_range_numeric(data_gen): ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and UNBOUNDED following) as max_b_unbounded ' 'from window_agg_table ', - conf={'spark.rapids.sql.window.range.byte.enabled': True, - 'spark.rapids.sql.window.range.short.enabled': True}) - + conf = conf) @ignore_order +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls, _grpkey_longs_with_nulls, _grpkey_longs_with_timestamps, @@ -204,7 +207,9 @@ def test_window_aggs_for_range_numeric(data_gen): _grpkey_longs_with_decimals, _grpkey_longs_with_nullable_decimals, _grpkey_decimals_with_nulls], ids=idfn) -def test_window_aggs_for_rows(data_gen): +def test_window_aggs_for_rows(data_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=2048), "window_agg_table", @@ -224,7 +229,7 @@ def test_window_aggs_for_rows(data_gen): ' row_number() over ' ' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num ' 'from window_agg_table ', - conf = {'spark.rapids.sql.castFloatToDecimal.enabled': True}) + conf = conf) part_and_order_gens = [long_gen, DoubleGen(no_nans=True, special_cases=[]), @@ -240,10 +245,13 @@ def tmp(something): @ignore_order @approximate_float +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('c_gen', lead_lag_data_gens, ids=idfn) @pytest.mark.parametrize('b_gen', part_and_order_gens, ids=meta_idfn('orderBy:')) @pytest.mark.parametrize('a_gen', part_and_order_gens, ids=meta_idfn('partBy:')) -def test_multi_types_window_aggs_for_rows_lead_lag(a_gen, b_gen, c_gen): +def test_multi_types_window_aggs_for_rows_lead_lag(a_gen, b_gen, c_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.hasNans': False} data_gen = [ ('a', RepeatSeqGen(a_gen, length=20)), ('b', b_gen), @@ -270,7 +278,7 @@ def do_it(spark): .withColumn('lag_1_c', f.lag('c', 1).over(baseWindowSpec)) \ .withColumn('lag_def_c', f.lag('c', 4, defaultVal).over(baseWindowSpec)) \ .withColumn('row_num', f.row_number().over(baseWindowSpec)) - assert_gpu_and_cpu_are_equal_collect(do_it, conf={'spark.rapids.sql.hasNans': 'false'}) + assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) lead_lag_array_data_gens =\ @@ -279,20 +287,14 @@ def do_it(spark): [ArrayGen(ArrayGen(ArrayGen(sub_gen, max_length=10), max_length=10), max_length=10) \ for sub_gen in lead_lag_data_gens] -# lead and lag are supported for arrays, but the other window operations like min and max are not right now -# once they are all supported the tests should be combined. -@pytest.mark.skip(reason="If some rows of order-by columns (here is a,b,c) are equal, then it may fail because" - "CPU and GPU can't guarantee the order for the same rows, while lead/lag is typically" - "depending on row's order. The solution is we should add the d and d_default columns" - "into the order-by to guarantee the order. But for now, sorting on array has not been" - "supported yet, see https://github.com/NVIDIA/spark-rapids/issues/2470." - "Once the issue is resolved, we should remove skip mark") @ignore_order(local=True) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('d_gen', lead_lag_array_data_gens, ids=meta_idfn('agg:')) -@pytest.mark.parametrize('c_gen', [long_gen], ids=meta_idfn('orderBy:')) +@pytest.mark.parametrize('c_gen', [LongRangeGen()], ids=meta_idfn('orderBy:')) @pytest.mark.parametrize('b_gen', [long_gen], ids=meta_idfn('orderBy:')) @pytest.mark.parametrize('a_gen', [long_gen], ids=meta_idfn('partBy:')) -def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen): +def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size} data_gen = [ ('a', RepeatSeqGen(a_gen, length=20)), ('b', b_gen), @@ -310,7 +312,8 @@ def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen): LAG(d, 5) OVER (PARTITION by a ORDER BY b,c) lag_d_5, LAG(d, 2, d_default) OVER (PARTITION by a ORDER BY b,c) lag_d_2_default FROM window_agg_table - ''') + ''', + conf = conf) # lead and lag don't currently work for string columns, so redo the tests, but just for strings From ffcee67d12fc8e7b4276dda5aed8f654bf741e64 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 24 May 2021 09:56:11 -0500 Subject: [PATCH 2/3] Addressed review comments --- integration_tests/src/main/python/data_gen.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 04475ba960e..1e4743c5d44 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -453,14 +453,14 @@ class DateGen(DataGen): def __init__(self, start=None, end=None, nullable=True): super().__init__(DateType(), nullable=nullable) if start is None: - # spark supports times starting at + # Spark supports times starting at # "0001-01-01 00:00:00.000000" start = date(1, 1, 1) elif not isinstance(start, date): raise RuntimeError('Unsupported type passed in for start {}'.format(start)) if end is None: - # spark supports time through + # Spark supports time through # "9999-12-31 23:59:59.999999" end = date(9999, 12, 31) elif isinstance(end, timedelta): @@ -513,17 +513,17 @@ class TimestampGen(DataGen): def __init__(self, start=None, end=None, nullable=True): super().__init__(TimestampType(), nullable=nullable) if start is None: - # spark supports times starting at + # Spark supports times starting at # "0001-01-01 00:00:00.000000" # but if has issues if you get really close to that because it tries to do things - # in a differnt fromat which causes roundoff, so we have to add a few days, + # in a different format which causes roundoff, so we have to add a few days, # just to be sure start = datetime(1, 1, 3, tzinfo=timezone.utc) elif not isinstance(start, datetime): raise RuntimeError('Unsupported type passed in for start {}'.format(start)) if end is None: - # spark supports time through + # Spark supports time through # "9999-12-31 23:59:59.999999" end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) elif isinstance(end, timedelta): From b964c1ec8e76a4dd3a38ff2c5da171617e4e405b Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 24 May 2021 15:41:11 -0500 Subject: [PATCH 3/3] Update integration_tests/src/main/python/data_gen.py Co-authored-by: Gera Shegalov --- integration_tests/src/main/python/data_gen.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 1e4743c5d44..7d06c81d3a5 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -515,7 +515,7 @@ def __init__(self, start=None, end=None, nullable=True): if start is None: # Spark supports times starting at # "0001-01-01 00:00:00.000000" - # but if has issues if you get really close to that because it tries to do things + # but it has issues if you get really close to that because it tries to do things # in a different format which causes roundoff, so we have to add a few days, # just to be sure start = datetime(1, 1, 3, tzinfo=timezone.utc)