From f991fd743c9b05f0d3b57834a095e7a9f24160f4 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Mon, 11 Apr 2022 15:26:59 +0800 Subject: [PATCH] Support Collect-like Reduction Aggregations (#4992) Enables collect aggregations under reduction context in spark-rapids Signed-off-by: sperlingxx lovedreamf@gmail.com --- docs/supported_ops.md | 68 +++++++++---------- .../src/main/python/hash_aggregate_test.py | 49 +++++++++---- .../nvidia/spark/rapids/GpuOverrides.scala | 8 +-- .../spark/sql/rapids/AggregateFunctions.scala | 30 +++++--- 4 files changed, 92 insertions(+), 63 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index eac820bb48a..942d324dfcd 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -14824,25 +14824,25 @@ are limited. `collect_list` Collect a list of non-unique elements, not supported in reduction None -reduction +aggregation input +S +S +S +S +S +S +S +S +PS
UTC is only supported TZ for TIMESTAMP
+S +S +S NS NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS @@ -14861,13 +14861,13 @@ are limited. -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-aggregation +reduction input S S @@ -14983,25 +14983,25 @@ are limited. `collect_set` Collect a set of unique elements, not supported in reduction None -reduction +aggregation input +S +S +S +S +S +S +S +S +PS
UTC is only supported TZ for TIMESTAMP
+S +S +S NS NS NS NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
NS @@ -15020,13 +15020,13 @@ are limited. -NS +PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
-aggregation +reduction input S S diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 2c39b9a40bf..67f5621eacb 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1038,18 +1038,18 @@ def test_first_last_reductions_nested_types(data_gen): def test_generic_reductions(data_gen): local_conf = copy_and_update(_no_nans_float_conf, {'spark.sql.legacy.allowParameterlessCount': 'true'}) assert_gpu_and_cpu_are_equal_collect( - # Coalesce and sort are to make sure that first and last, which are non-deterministic - # become deterministic - lambda spark : unary_op_df(spark, data_gen)\ - .coalesce(1).selectExpr( - 'min(a)', - 'max(a)', - 'first(a)', - 'last(a)', - 'count(a)', - 'count()', - 'count(1)'), - conf = local_conf) + # Coalesce and sort are to make sure that first and last, which are non-deterministic + # become deterministic + lambda spark : unary_op_df(spark, data_gen) \ + .coalesce(1).selectExpr( + 'min(a)', + 'max(a)', + 'first(a)', + 'last(a)', + 'count(a)', + 'count()', + 'count(1)'), + conf=local_conf) @pytest.mark.parametrize('data_gen', all_gen + _nested_gens, ids=idfn) def test_count(data_gen): @@ -1083,6 +1083,31 @@ def test_arithmetic_reductions(data_gen): 'avg(a)'), conf = _no_nans_float_conf) +@pytest.mark.parametrize('data_gen', + non_nan_all_basic_gens + decimal_gens + _nested_gens, + ids=idfn) +def test_collect_list_reductions(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr('collect_list(a)'), + conf=_no_nans_float_conf) + +_struct_only_nested_gens = [all_basic_struct_gen, + StructGen([['child0', byte_gen], ['child1', all_basic_struct_gen]]), + StructGen([])] +@pytest.mark.parametrize('data_gen', + non_nan_all_basic_gens + decimal_gens + _struct_only_nested_gens, + ids=idfn) +def test_collect_set_reductions(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr('sort_array(collect_set(a))'), + conf=_no_nans_float_conf) + +def test_collect_empty(): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.sql("select collect_list(null)")) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.sql("select collect_set(null)")) + @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_gen + _nested_gens, ids=idfn) def test_groupby_first_last(data_gen): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b6f626e4c90..044f8c4f1a8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3218,8 +3218,7 @@ object GpuOverrides extends Logging { }), expr[CollectList]( "Collect a list of non-unique elements, not supported in reduction", - // GpuCollectList is not yet supported in Reduction context. - ExprChecks.aggNotReduction( + ExprChecks.fullAgg( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP), TypeSig.ARRAY.nested(TypeSig.all), @@ -3249,10 +3248,7 @@ object GpuOverrides extends Logging { }), expr[CollectSet]( "Collect a set of unique elements, not supported in reduction", - // GpuCollectSet is not yet supported in Reduction context. - // Compared to CollectList, ArrayType and MapType are NOT supported in GpuCollectSet - // because underlying cuDF operator drop_list_duplicates doesn't support LIST type. - ExprChecks.aggNotReduction( + ExprChecks.fullAgg( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.NULL + TypeSig.STRUCT), TypeSig.ARRAY.nested(TypeSig.all), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 5bd4a8c3cbd..f26291f1796 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExprId, ImplicitCastInputTypes, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, TypeUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -352,32 +352,39 @@ class CudfMin(override val dataType: DataType) extends CudfAggregate { } class CudfCollectList(override val dataType: DataType) extends CudfAggregate { - override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ => - throw new UnsupportedOperationException("CollectList is not yet supported in reduction") + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.collectList(), DType.LIST) override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.collectList() override val name: String = "CudfCollectList" } class CudfMergeLists(override val dataType: DataType) extends CudfAggregate { - override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ => - throw new UnsupportedOperationException("MergeLists is not yet supported in reduction") + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.mergeLists(), DType.LIST) override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeLists() override val name: String = "CudfMergeLists" } class CudfCollectSet(override val dataType: DataType) extends CudfAggregate { - override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ => - throw new UnsupportedOperationException("CollectSet is not yet supported in reduction") + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (col: cudf.ColumnVector) => { + val collectSet = ReductionAggregation.collectSet( + NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.UNEQUAL) + col.reduce(collectSet, DType.LIST) + } override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.collectSet(NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.UNEQUAL) override val name: String = "CudfCollectSet" } class CudfMergeSets(override val dataType: DataType) extends CudfAggregate { - override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ => - throw new UnsupportedOperationException("CudfMergeSets is not yet supported in reduction") + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (col: cudf.ColumnVector) => { + val mergeSets = ReductionAggregation.mergeSets(NullEquality.EQUAL, NaNEquality.UNEQUAL) + col.reduce(mergeSets, DType.LIST) + } override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeSets(NullEquality.EQUAL, NaNEquality.UNEQUAL) override val name: String = "CudfMergeSets" @@ -1578,8 +1585,9 @@ trait GpuCollectBase // WINDOW FUNCTION override val windowInputProjection: Seq[Expression] = Seq(child) - // Make them lazy to avoid being initialized when creating a GpuCollectOp. - override lazy val initialValues: Seq[Expression] = throw new UnsupportedOperationException + override val initialValues: Seq[Expression] = { + Seq(GpuLiteral.create(new GenericArrayData(Array.empty[Any]), dataType)) + } override val inputProjection: Seq[Expression] = Seq(child)