diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index c584d79f288..8467bcbeef0 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -16,7 +16,6 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from marks import * from pyspark.sql.types import * import pyspark.sql.functions as f from spark_session import is_before_spark_311 @@ -38,10 +37,7 @@ def test_single_orderby(data_gen, order): pytest.param(1), pytest.param(200) ]) -@pytest.mark.parametrize('stable_sort', [ - pytest.param(True), - pytest.param(False) -]) +@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE']) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), pytest.param(StructGen([['child0', all_basic_struct_gen]]), @@ -68,7 +64,7 @@ def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort **allow_negative_scale_of_decimal_conf, **{ 'spark.sql.shuffle.partitions': shuffle_parts, - 'spark.rapids.sql.stableSort.enabled': stable_sort + 'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE' } }) @@ -113,12 +109,31 @@ def test_single_sort_in_part(data_gen, order): lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order), conf = allow_negative_scale_of_decimal_conf) + +@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) +@pytest.mark.parametrize('order', [ + pytest.param(f.col('a').asc()), + pytest.param(f.col('a').asc_nulls_first()), + pytest.param(f.col('a').asc_nulls_last(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc()), + pytest.param(f.col('a').desc_nulls_first(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc_nulls_last()), +], ids=idfn) +@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn) +def test_single_nested_sort_in_part(data_gen, order, stable_sort): + sort_conf = {'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'} + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order), + conf={**allow_negative_scale_of_decimal_conf, **sort_conf}) + orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen, pytest.param(float_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(), reason='Spark has -0.0 < 0.0 before Spark 3.1')), pytest.param(double_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(), reason='Spark has -0.0 < 0.0 before Spark 3.1')), - boolean_gen, timestamp_gen, date_gen, string_gen, null_gen] + decimal_gens + boolean_gen, timestamp_gen, date_gen, string_gen, null_gen, StructGen([('child0', long_gen)])] + decimal_gens @pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn) def test_multi_orderby(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -146,6 +161,16 @@ def test_orderby_with_processing_and_limit(data_gen): # avoid ambiguity in the order by statement for floating point by including a as a backup ordering column lambda spark : unary_op_df(spark, data_gen).orderBy(f.lit(100) - f.col('a'), f.col('a')).limit(100)) + +# We are not trying all possibilities, just doing a few with numbers so the query works. +@pytest.mark.parametrize('data_gen', [StructGen([('child0', long_gen)])], ids=idfn) +def test_single_nested_orderby_with_processing_and_limit(data_gen): + assert_gpu_and_cpu_are_equal_collect( + # avoid ambiguity in the order by statement for floating point by including a as a backup ordering column + lambda spark : unary_op_df(spark, data_gen)\ + .orderBy(f.struct(f.lit(100) - f.col('a.child0')), f.col('a'))\ + .limit(100)) + # We are not trying all possibilities, just doing a few with numbers so the query works. @pytest.mark.parametrize('data_gen', [byte_gen, long_gen, float_gen], ids=idfn) def test_single_orderby_with_skew(data_gen): @@ -160,15 +185,38 @@ def test_single_orderby_with_skew(data_gen): .selectExpr('a'), conf = allow_negative_scale_of_decimal_conf) + +# We are not trying all possibilities, just doing a few with numbers so the query works. +@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) +@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn) +def test_single_nested_orderby_with_skew(data_gen, stable_sort): + sort_conf = {'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'} + # When doing range partitioning the upstream data is sampled to try and get the bounds for cutoffs. + # If the data comes back with skewed partitions then those partitions will be resampled for more data. + # This is to try and trigger it to happen. + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen) \ + .selectExpr('a', 'random(1) > 0.5 as b') \ + .repartition(f.col('b')) \ + .orderBy(f.col('a')) \ + .selectExpr('a'), + conf={**allow_negative_scale_of_decimal_conf, **sort_conf}) + + # This is primarily to test the out of core sort with multiple batches. For this we set the data size to # be relatively large (1 MiB across all tasks) and the target size to be small (16 KiB). This means we # should see around 64 batches of data. So this is the most valid if there are less than 64 tasks # in the cluster, but it should still work even then. -def test_large_orderby(): +@pytest.mark.parametrize('data_gen', [long_gen, StructGen([('child0', long_gen)])], ids=idfn) +@pytest.mark.parametrize('stable_sort', ['STABLE', 'OUTOFCORE'], ids=idfn) +def test_large_orderby(data_gen, stable_sort): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, long_gen, length=1024*128)\ + lambda spark : unary_op_df(spark, data_gen, length=1024*128)\ .orderBy(f.col('a')), - conf = {'spark.rapids.sql.batchSizeBytes': '16384'}) + conf={ + 'spark.rapids.sql.batchSizeBytes': '16384', + 'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE' + }) # This is similar to test_large_orderby, but here we want to test some types # that are not being sorted on, but are going along with it