From c7483f2197741abc4d95e76d49c0bc5dfbf1b6b0 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Thu, 9 Dec 2021 22:01:51 +0800 Subject: [PATCH] Support GpuFirst and GpuLast on nested types under reduction aggregations (#4337) Signed-off-by: sperlingxx --- docs/supported_ops.md | 24 +++++++------- .../src/main/python/hash_aggregate_test.py | 32 ++++++++----------- .../nvidia/spark/rapids/GpuOverrides.scala | 20 ++---------- 3 files changed, 27 insertions(+), 49 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 051c9a1906d..0597fe41d6f 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -14730,9 +14730,9 @@ are limited. S NS NS -NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS @@ -14751,9 +14751,9 @@ are limited. S NS NS -NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS @@ -14863,9 +14863,9 @@ are limited. S NS NS -NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS @@ -14884,9 +14884,9 @@ are limited. S NS NS -NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 406ec0fb694..9d236adcf71 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1008,28 +1008,22 @@ def test_count_distinct_with_nan_floats(data_gen): _nested_gens = array_gens_sample + struct_gens_sample + map_gens_sample @pytest.mark.parametrize('data_gen', decimal_gens + decimal_128_gens, ids=idfn) -def test_first_last_reductions_extra_types(data_gen): +def test_first_last_reductions_decimal_types(data_gen): assert_gpu_and_cpu_are_equal_collect( - # Coalesce and sort are to make sure that first and last, which are non-deterministic - # become deterministic - lambda spark : unary_op_df(spark, data_gen)\ - .coalesce(1).selectExpr( - 'first(a)', - 'last(a)'), - conf = allow_negative_scale_of_decimal_conf) + # Coalesce and sort are to make sure that first and last, which are non-deterministic + # become deterministic + lambda spark: unary_op_df(spark, data_gen).coalesce(1).selectExpr( + 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)'), + conf=allow_negative_scale_of_decimal_conf) -# TODO: https://github.com/NVIDIA/spark-rapids/issues/3221 -@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', - 'ShuffleExchangeExec', 'HashPartitioning', - 'AggregateExpression', 'Alias', 'First', 'Last') @pytest.mark.parametrize('data_gen', _nested_gens, ids=idfn) -def test_first_last_reductions_nested_types_fallback(data_gen): - assert_cpu_and_gpu_are_equal_collect_with_capture( - lambda spark: unary_op_df(spark, data_gen, num_slices=1)\ - .selectExpr('first(a)', 'last(a)', 'first(a, True)', 'last(a, True)'), - exist_classes='First,Last', - non_exist_classes='GpuFirst,GpuLast', - conf=allow_negative_scale_of_decimal_conf) +def test_first_last_reductions_nested_types(data_gen): + assert_gpu_and_cpu_are_equal_collect( + # Coalesce and sort are to make sure that first and last, which are non-deterministic + # become deterministic + lambda spark: unary_op_df(spark, data_gen).coalesce(1).selectExpr( + 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)'), + conf=allow_negative_scale_of_decimal_conf) @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) @pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index d51fdb531db..58174ba1ccf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2294,14 +2294,7 @@ object GpuOverrides extends Logging { }), expr[First]( "first aggregate operator", { - val checks = ExprChecks.aggNotWindow( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL, TypeSig.all, - Seq(ParamCheck("input", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL, TypeSig.all)) - ).asInstanceOf[ExprChecksImpl] - // TODO: support GpuFirst on nested types for reduction - // https://github.com/NVIDIA/spark-rapids/issues/3221 - val nestedChecks = ContextChecks( + ExprChecks.aggNotWindow( (TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL).nested(), TypeSig.all, @@ -2310,7 +2303,6 @@ object GpuOverrides extends Logging { TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL).nested(), TypeSig.all)) ) - ExprChecksImpl(checks.contexts ++ Map(GroupByAggExprContext -> nestedChecks)) }, (a, conf, p, r) => new AggExprMeta[First](a, conf, p, r) { override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = @@ -2321,14 +2313,7 @@ object GpuOverrides extends Logging { }), expr[Last]( "last aggregate operator", { - val checks = ExprChecks.aggNotWindow( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL, TypeSig.all, - Seq(ParamCheck("input", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL, TypeSig.all)) - ).asInstanceOf[ExprChecksImpl] - // TODO: support GpuLast on nested types for reduction - // https://github.com/NVIDIA/spark-rapids/issues/3221 - val nestedChecks = ContextChecks( + ExprChecks.aggNotWindow( (TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL).nested(), TypeSig.all, @@ -2337,7 +2322,6 @@ object GpuOverrides extends Logging { TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL).nested(), TypeSig.all)) ) - ExprChecksImpl(checks.contexts ++ Map(GroupByAggExprContext -> nestedChecks)) }, (a, conf, p, r) => new AggExprMeta[Last](a, conf, p, r) { override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =