From 838344d59038b81afc2f3028a524d02557df174a Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 19 Jan 2021 14:42:39 +0800 Subject: [PATCH 01/12] Support collect_list on GPU for windowing. Signed-off-by: Firestarman --- .../nvidia/spark/rapids/GpuOverrides.scala | 14 ++++++ .../com/nvidia/spark/rapids/TypeChecks.scala | 2 +- .../spark/sql/rapids/AggregateFunctions.scala | 43 ++++++++++++++++++- 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index f4ef7374eee..c2d6bf0969d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2170,6 +2170,20 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[MakeDecimal](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow) + }), + expr[CollectList]( + "Collect a list of elements", + /* It should be 'fullAgg' eventually but now only support windowing, so 'windowOnly' */ + ExprChecks.windowOnly(TypeSig.ARRAY.nested(TypeSig.integral + + TypeSig.STRUCT.nested(TypeSig.commonCudfTypes)), + TypeSig.ARRAY.nested(TypeSig.all), + Seq(ParamCheck("input", + TypeSig.integral + TypeSig.STRUCT.nested( + TypeSig.integral + TypeSig.STRING + TypeSig.TIMESTAMP), + TypeSig.all))), + (c, conf, p, r) => new ExprMeta[CollectList](c, conf, p, r) { + override def convertToGpu(): GpuExpression = GpuCollectList( + childExprs.head.convertToGpu(), c.mutableAggBufferOffset, c.inputAggBufferOffset) }) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 76915acaa70..98f8f3e11fc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -985,7 +985,7 @@ object ExprChecks { } /** - * Window only operations. Spark does not support these operations as anythign but a window + * Window only operations. Spark does not support these operations as anything but a window * operation. */ def windowOnly( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index ec1db228972..17efa07299a 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -22,9 +22,9 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExprId, ImplicitCastInputTypes} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, Complete, Final, Partial, PartialMerge} +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.util.TypeUtils -import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BooleanType, DataType, DoubleType, LongType, NumericType, StructType} +import org.apache.spark.sql.types._ trait GpuAggregateFunction extends GpuExpression { // using the child reference, define the shape of the vectors sent to @@ -529,3 +529,42 @@ abstract class GpuLastBase(child: Expression) override lazy val deterministic: Boolean = false override def toString: String = s"gpulast($child)${if (ignoreNulls) " ignore nulls"}" } + +/** + * Collects and returns a list of non-unique elements. + * + * FIXME Not sure whether GPU version requires the two offset parameters. Keep it here first. + */ +case class GpuCollectList(child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) + extends GpuDeclarativeAggregate with GpuAggregateWindowFunction { + + def this(child: Expression) = this(child, 0, 0) + + override lazy val deterministic: Boolean = false + + override def nullable: Boolean = false + + override def prettyName: String = "collect_list" + + override def dataType: DataType = ArrayType(child.dataType, false) + + override def children: Seq[Expression] = child :: Nil + + // WINDOW FUNCTION + override val windowInputProjection: Seq[Expression] = Seq(child) + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = + Aggregation.collect().onColumn(inputs.head._2) + + // Declarative aggregate. But for now 'CollectList' does not support it. + // The members as below should NOT be used yet, ensured by the "TypeCheck.windowOnly" + // when overriding the expression. + private lazy val cudfList = AttributeReference("collect_list", dataType)() + override val initialValues: Seq[GpuExpression] = Seq.empty + override val updateExpressions: Seq[Expression] = Seq.empty + override val mergeExpressions: Seq[GpuExpression] = Seq.empty + override val evaluateExpression: Expression = null + override val inputProjection: Seq[Expression] = Seq(child) + override def aggBufferAttributes: Seq[AttributeReference] = cudfList :: Nil +} From 54a9ed5fafc8c69f02856ce52a26cd3a7a97e48d Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 27 Jan 2021 14:31:07 +0800 Subject: [PATCH 02/12] Let collect_list supports array of struct. Let collect_list with Windowing supports type of array of struct. Signed-off-by: Firestarman --- .../nvidia/spark/rapids/GpuOverrides.scala | 21 +++++++++++-------- .../spark/rapids/GpuWindowExpression.scala | 20 ++++-------------- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index c2d6bf0969d..bbd993b5808 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -735,11 +735,11 @@ object GpuOverrides { "\"window\") of rows", ExprChecks.windowOnly( TypeSig.commonCudfTypes + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL), + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all, Seq(ParamCheck("windowFunction", TypeSig.commonCudfTypes + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL), + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all), ParamCheck("windowSpec", TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL, @@ -1644,11 +1644,13 @@ object GpuOverrides { expr[AggregateExpression]( "Aggregate expression", ExprChecks.fullAgg( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.STRUCT), TypeSig.all, Seq(ParamCheck( "aggFunc", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.STRUCT), TypeSig.all)), Some(RepeatingParamCheck("filter", TypeSig.BOOLEAN, TypeSig.BOOLEAN))), (a, conf, p, r) => new ExprMeta[AggregateExpression](a, conf, p, r) { @@ -2174,12 +2176,11 @@ object GpuOverrides { expr[CollectList]( "Collect a list of elements", /* It should be 'fullAgg' eventually but now only support windowing, so 'windowOnly' */ - ExprChecks.windowOnly(TypeSig.ARRAY.nested(TypeSig.integral + - TypeSig.STRUCT.nested(TypeSig.commonCudfTypes)), + ExprChecks.windowOnly( + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.STRUCT), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", - TypeSig.integral + TypeSig.STRUCT.nested( - TypeSig.integral + TypeSig.STRING + TypeSig.TIMESTAMP), + TypeSig.commonCudfTypes + TypeSig.STRUCT.nested(TypeSig.commonCudfTypes), TypeSig.all))), (c, conf, p, r) => new ExprMeta[CollectList](c, conf, p, r) { override def convertToGpu(): GpuExpression = GpuCollectList( @@ -2513,7 +2514,9 @@ object GpuOverrides { (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)), exec[WindowExec]( "Window-operator backend", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL, TypeSig.all), + ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT + + TypeSig.ARRAY.nested(TypeSig.STRUCT + TypeSig.commonCudfTypes), + TypeSig.all), (windowOp, conf, p, r) => new GpuWindowExecMeta(windowOp, conf, p, r) ), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 9700d462b0e..c3bc5e875f6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -199,14 +199,8 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } } - val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) - if (expectedType != aggColumn.getType) { - withResource(aggColumn) { aggColumn => - GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) - } - } else { - GpuColumnVector.from(aggColumn, windowFunc.dataType) - } + // Seems we should not cast the type explicitly here but let GpuColumnVector handle it. + GpuColumnVector.from(aggColumn, windowFunc.dataType) } private def evaluateRangeBasedWindowExpression(cb : ColumnarBatch) : GpuColumnVector = { @@ -230,14 +224,8 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } } - val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) - if (expectedType != aggColumn.getType) { - withResource(aggColumn) { aggColumn => - GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) - } - } else { - GpuColumnVector.from(aggColumn, windowFunc.dataType) - } + // Seems we should not cast the type explicitly here but let GpuColumnVector handle it. + GpuColumnVector.from(aggColumn, windowFunc.dataType) } } From 371b83fae9a945fdff182e223583c3439f91fb6e Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 27 Jan 2021 15:51:41 +0800 Subject: [PATCH 03/12] Only allow array type for collect_list Signed-off-by: Firestarman --- .../scala/com/nvidia/spark/rapids/GpuOverrides.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index bbd993b5808..aac8ab53069 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1675,6 +1675,17 @@ object GpuOverrides { GpuAggregateExpression(childExprs(0).convertToGpu().asInstanceOf[GpuAggregateFunction], a.mode, a.isDistinct, filter.map(_.convertToGpu()), resultId) } + + // NOTE: Will remove this once all aggregates support array type. + override def tagExprForGpu(): Unit = { + // Only allow Array type for function "CollectList", since other aggregate functions + // have not been verified. + wrapped.dataType match { + case _: ArrayType if !wrapped.aggregateFunction.isInstanceOf[CollectList] => + willNotWorkOnGpu("Now only 'collect_list' supports type of array.") + case _ => + } + } }), expr[SortOrder]( "Sort order", From 42a5408c3689d282f386ac2409f4566c8034ba04 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 27 Jan 2021 17:59:31 +0800 Subject: [PATCH 04/12] Restore the cast for non nested type. Signed-off-by: Firestarman --- .../spark/rapids/GpuWindowExpression.scala | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index c3bc5e875f6..5805cd4388c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -199,8 +199,16 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } } - // Seems we should not cast the type explicitly here but let GpuColumnVector handle it. - GpuColumnVector.from(aggColumn, windowFunc.dataType) + // For nested type, do not cast + aggColumn.getType match { + case dType if dType.isNestedType => + GpuColumnVector.from(aggColumn, windowFunc.dataType) + case _ => + val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) + withResource(aggColumn) { aggColumn => + GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) + } + } } private def evaluateRangeBasedWindowExpression(cb : ColumnarBatch) : GpuColumnVector = { @@ -224,8 +232,16 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } } - // Seems we should not cast the type explicitly here but let GpuColumnVector handle it. - GpuColumnVector.from(aggColumn, windowFunc.dataType) + // For nested type, do not cast + aggColumn.getType match { + case dType if dType.isNestedType => + GpuColumnVector.from(aggColumn, windowFunc.dataType) + case _ => + val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) + withResource(aggColumn) { aggColumn => + GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) + } + } } } From 63aa7a0ad390269deb3c0c34447ea05ff8fa8a56 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 28 Jan 2021 13:02:19 +0800 Subject: [PATCH 05/12] Updated the type check Signed-off-by: Firestarman --- .../nvidia/spark/rapids/GpuOverrides.scala | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index aac8ab53069..ac2a89dbf91 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -734,12 +734,11 @@ object GpuOverrides { "Calculates a return value for every input row of a table based on a group (or " + "\"window\") of rows", ExprChecks.windowOnly( - TypeSig.commonCudfTypes + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), + (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested + TypeSig.ARRAY.nested(TypeSig.STRUCT), TypeSig.all, Seq(ParamCheck("windowFunction", - TypeSig.commonCudfTypes + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), + (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested + + TypeSig.ARRAY.nested(TypeSig.STRUCT), TypeSig.all), ParamCheck("windowSpec", TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL, @@ -1644,13 +1643,13 @@ object GpuOverrides { expr[AggregateExpression]( "Aggregate expression", ExprChecks.fullAgg( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.STRUCT), + (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested() + TypeSig.NULL + + TypeSig.ARRAY.nested(TypeSig.STRUCT), TypeSig.all, Seq(ParamCheck( "aggFunc", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.STRUCT), + (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested() + TypeSig.NULL + + TypeSig.ARRAY.nested(TypeSig.STRUCT), TypeSig.all)), Some(RepeatingParamCheck("filter", TypeSig.BOOLEAN, TypeSig.BOOLEAN))), (a, conf, p, r) => new ExprMeta[AggregateExpression](a, conf, p, r) { @@ -1675,17 +1674,6 @@ object GpuOverrides { GpuAggregateExpression(childExprs(0).convertToGpu().asInstanceOf[GpuAggregateFunction], a.mode, a.isDistinct, filter.map(_.convertToGpu()), resultId) } - - // NOTE: Will remove this once all aggregates support array type. - override def tagExprForGpu(): Unit = { - // Only allow Array type for function "CollectList", since other aggregate functions - // have not been verified. - wrapped.dataType match { - case _: ArrayType if !wrapped.aggregateFunction.isInstanceOf[CollectList] => - willNotWorkOnGpu("Now only 'collect_list' supports type of array.") - case _ => - } - } }), expr[SortOrder]( "Sort order", @@ -2185,13 +2173,13 @@ object GpuOverrides { GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow) }), expr[CollectList]( - "Collect a list of elements", + "Collect a list of elements, now only supported by windowing.", /* It should be 'fullAgg' eventually but now only support windowing, so 'windowOnly' */ ExprChecks.windowOnly( - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.STRUCT), + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", - TypeSig.commonCudfTypes + TypeSig.STRUCT.nested(TypeSig.commonCudfTypes), + (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested() + TypeSig.STRUCT, TypeSig.all))), (c, conf, p, r) => new ExprMeta[CollectList](c, conf, p, r) { override def convertToGpu(): GpuExpression = GpuCollectList( @@ -2525,8 +2513,8 @@ object GpuOverrides { (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)), exec[WindowExec]( "Window-operator backend", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT + - TypeSig.ARRAY.nested(TypeSig.STRUCT + TypeSig.commonCudfTypes), + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested() + TypeSig.ARRAY, TypeSig.all), (windowOp, conf, p, r) => new GpuWindowExecMeta(windowOp, conf, p, r) From bfef79558f8cb3931f4a940bc6a7cf90eef9374e Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 28 Jan 2021 13:31:01 +0800 Subject: [PATCH 06/12] Add integration tests for collect_list. Add integration tests for collect_list with windowing. Signed-off-by: Firestarman --- .../src/main/python/window_function_test.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 9bbf93a6e26..02c78e296d4 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -213,3 +213,53 @@ def test_window_aggs_for_ranges_of_dates(data_gen): ' range between 1 preceding and 1 following) as sum_c_asc ' 'from window_agg_table' ) + + +''' + Spark will drop nulls when collecting, but seems GPU does not, so exceptions come up. +E Caused by: java.lang.AssertionError: value at 350 is null +E at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:228) +E at ai.rapids.cudf.HostColumnVectorCore.getInt(HostColumnVectorCore.java:254) +E at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getInt(RapidsHostColumnVectorCore.java:109) +E at org.apache.spark.sql.vectorized.ColumnarArray.getInt(ColumnarArray.java:128) + + Now set nullable to false to pass the tests, once native supports dropping nulls, will set it to true. +''' +collect_data_gen = [ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', IntegerGen()), + ('c_int', IntegerGen(nullable=False)), + ('c_long', LongGen(nullable=False)), + ('c_time', DateGen(nullable=False)), + ('c_string', StringGen(nullable=False)), + ('c_float', FloatGen(nullable=False)), + ('c_decimal', DecimalGen(nullable=False, precision=8, scale=3)), + ('c_struct', StructGen(nullable=False, children = [ + ['child_int', IntegerGen()], + ['child_time', DateGen()], + ['child_string', StringGen()], + ['child_decimal', DecimalGen(nullable=False, precision=8, scale=3)]]))] + +# SortExec does not support array type, so sort the result locally. +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [collect_data_gen], ids=idfn) +def test_window_aggs_for_rows_collect_list(data_gen): + assert_gpu_and_cpu_are_equal_sql( + lambda spark : gen_df(spark, data_gen, length=2048), + "window_collect_table", + 'select ' + ' collect_list(c_int) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_int, ' + ' collect_list(c_long) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_long, ' + ' collect_list(c_time) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_time, ' + ' collect_list(c_string) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_string, ' + ' collect_list(c_float) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_float, ' + ' collect_list(c_decimal) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_decimal, ' + ' collect_list(c_struct) over ' + ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_struct ' + 'from window_collect_table ') From 127ff2e8511a6aa33a9a2513af8264c258484141 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 28 Jan 2021 14:40:22 +0800 Subject: [PATCH 07/12] Doc updates Signed-off-by: Firestarman --- docs/configs.md | 1 + docs/supported_ops.md | 67 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 6ff58712230..f105fc7181e 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -246,6 +246,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Year|`year`|Returns the year from a date or timestamp|true|None| spark.rapids.sql.expression.AggregateExpression| |Aggregate expression|true|None| spark.rapids.sql.expression.Average|`avg`, `mean`|Average aggregate operator|true|None| +spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of elements, now only supported by windowing.|true|None| spark.rapids.sql.expression.Count|`count`|Count aggregate operator|true|None| spark.rapids.sql.expression.First|`first_value`, `first`|first aggregate operator|true|None| spark.rapids.sql.expression.Last|`last`, `last_value`|last aggregate operator|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 0e6d8b7bd93..0bbc2429662 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -745,9 +745,9 @@ Accelerator supports are described below. NS NS NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS -NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS @@ -15227,7 +15227,7 @@ Accelerator support is described below. NS NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15269,7 +15269,7 @@ Accelerator support is described below. NS NS NS -PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15453,7 +15453,7 @@ Accelerator support is described below. S NS NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15495,7 +15495,7 @@ Accelerator support is described below. S NS NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15517,7 +15517,7 @@ Accelerator support is described below. S NS NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15559,7 +15559,7 @@ Accelerator support is described below. S NS NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15581,7 +15581,7 @@ Accelerator support is described below. S NS NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15623,7 +15623,7 @@ Accelerator support is described below. S NS NS -NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) NS NS NS @@ -15762,6 +15762,53 @@ Accelerator support is described below. +CollectList +`collect_list` +Collect a list of elements, now only supported by windowing. +None +window +input +S +S +S +S +S +S +S +S +S* +S +S* +NS +NS +NS +NS +NS +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +NS + + +result + + + + + + + + + + + + + + +PS* (missing nested NULL, BINARY, CALENDAR, ARRAY, MAP, UDT) + + + + + Count `count` Count aggregate operator From 971fcaa5bbce300d9cc8ad932db3be20388eff76 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 29 Jan 2021 10:09:47 +0800 Subject: [PATCH 08/12] Address some comments Signed-off-by: Firestarman --- docs/configs.md | 2 +- docs/supported_ops.md | 2 +- .../src/main/python/window_function_test.py | 3 ++- .../scala/com/nvidia/spark/rapids/GpuOverrides.scala | 8 +++++--- .../scala/com/nvidia/spark/rapids/TypeChecks.scala | 12 ++++++++++++ 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index f105fc7181e..5136957c841 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -246,7 +246,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Year|`year`|Returns the year from a date or timestamp|true|None| spark.rapids.sql.expression.AggregateExpression| |Aggregate expression|true|None| spark.rapids.sql.expression.Average|`avg`, `mean`|Average aggregate operator|true|None| -spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of elements, now only supported by windowing.|true|None| +spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of elements, now only supported by windowing.|false|This is disabled by default because for now the GPU collects null values to a list, but Spark does not. This will be fixed in future releases.| spark.rapids.sql.expression.Count|`count`|Count aggregate operator|true|None| spark.rapids.sql.expression.First|`first_value`, `first`|first aggregate operator|true|None| spark.rapids.sql.expression.Last|`last`, `last_value`|last aggregate operator|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 0bbc2429662..621cf4a3219 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -15765,7 +15765,7 @@ Accelerator support is described below. CollectList `collect_list` Collect a list of elements, now only supported by windowing. -None +This is disabled by default because for now the GPU collects null values to a list, but Spark does not. This will be fixed in future releases. window input S diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 02c78e296d4..3d854c4acb5 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -262,4 +262,5 @@ def test_window_aggs_for_rows_collect_list(data_gen): ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_decimal, ' ' collect_list(c_struct) over ' ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_struct ' - 'from window_collect_table ') + 'from window_collect_table ', + {'spark.rapids.sql.expression.CollectList': 'true'}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index ac2a89dbf91..49b88a0d789 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2174,8 +2174,9 @@ object GpuOverrides { }), expr[CollectList]( "Collect a list of elements, now only supported by windowing.", - /* It should be 'fullAgg' eventually but now only support windowing, so 'windowOnly' */ - ExprChecks.windowOnly( + /* It should be 'fullAgg' eventually but now only support windowing, + so 'aggNotGroupByOrReduction' */ + ExprChecks.aggNotGroupByOrReduction( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", @@ -2184,7 +2185,8 @@ object GpuOverrides { (c, conf, p, r) => new ExprMeta[CollectList](c, conf, p, r) { override def convertToGpu(): GpuExpression = GpuCollectList( childExprs.head.convertToGpu(), c.mutableAggBufferOffset, c.inputAggBufferOffset) - }) + }).disabledByDefault("for now the GPU collects null values to a list, but Spark does not." + + " This will be fixed in future releases.") ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap val expressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 98f8f3e11fc..819a7b780c2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -996,6 +996,18 @@ object ExprChecks { ExprChecksImpl(Map( (WindowAggExprContext, ContextChecks(outputCheck, sparkOutputSig, paramCheck, repeatingParamCheck)))) + + /** + * An aggregation check where window operations are supported by the plugin, but Spark + * also supports group by and reduction on these. + * This is now really for 'collect_list' which is only supported by windowing. + */ + def aggNotGroupByOrReduction( + outputCheck: TypeSig, + sparkOutputSig: TypeSig, + paramCheck: Seq[ParamCheck] = Seq.empty, + repeatingParamCheck: Option[RepeatingParamCheck] = None): ExprChecks = + windowOnly(outputCheck, sparkOutputSig, paramCheck, repeatingParamCheck) } /** From 171c49592cc85ce6167d65c7e8e9c9fdfdccfbe5 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 29 Jan 2021 10:14:24 +0800 Subject: [PATCH 09/12] Comment update Signed-off-by: Firestarman --- .../org/apache/spark/sql/rapids/AggregateFunctions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 17efa07299a..fc61b2213ea 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -558,8 +558,8 @@ case class GpuCollectList(child: Expression, Aggregation.collect().onColumn(inputs.head._2) // Declarative aggregate. But for now 'CollectList' does not support it. - // The members as below should NOT be used yet, ensured by the "TypeCheck.windowOnly" - // when overriding the expression. + // The members as below should NOT be used yet, ensured by the + // "TypeCheck.aggNotGroupByOrReduction" when trying to override the expression. private lazy val cudfList = AttributeReference("collect_list", dataType)() override val initialValues: Seq[GpuExpression] = Seq.empty override val updateExpressions: Seq[Expression] = Seq.empty From 4592a81b760020dafd4002758fc50ead178894b9 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 1 Feb 2021 13:25:59 +0800 Subject: [PATCH 10/12] Addressed some new comments Signed-off-by: Firestarman --- docs/supported_ops.md | 94 +++++++++++++++++- .../src/main/python/window_function_test.py | 98 +++++++++++-------- .../spark/rapids/GpuWindowExpression.scala | 6 ++ .../com/nvidia/spark/rapids/TypeChecks.scala | 17 +++- .../spark/sql/rapids/AggregateFunctions.scala | 12 ++- 5 files changed, 173 insertions(+), 54 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 621cf4a3219..a4cf42fcc6c 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -15762,10 +15762,96 @@ Accelerator support is described below. -CollectList -`collect_list` -Collect a list of elements, now only supported by windowing. -This is disabled by default because for now the GPU collects null values to a list, but Spark does not. This will be fixed in future releases. +CollectList +`collect_list` +Collect a list of elements, now only supported by windowing. +This is disabled by default because for now the GPU collects null values to a list, but Spark does not. This will be fixed in future releases. +aggregation +input +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS + + +result + + + + + + + + + + + + + + +NS + + + + + +reduction +input +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS + + +result + + + + + + + + + + + + + + +NS + + + + + window input S diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 3d854c4acb5..e53d08169ca 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -215,52 +215,64 @@ def test_window_aggs_for_ranges_of_dates(data_gen): ) -''' - Spark will drop nulls when collecting, but seems GPU does not, so exceptions come up. -E Caused by: java.lang.AssertionError: value at 350 is null -E at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:228) -E at ai.rapids.cudf.HostColumnVectorCore.getInt(HostColumnVectorCore.java:254) -E at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getInt(RapidsHostColumnVectorCore.java:109) -E at org.apache.spark.sql.vectorized.ColumnarArray.getInt(ColumnarArray.java:128) - - Now set nullable to false to pass the tests, once native supports dropping nulls, will set it to true. -''' -collect_data_gen = [ - ('a', RepeatSeqGen(LongGen(), length=20)), - ('b', IntegerGen()), - ('c_int', IntegerGen(nullable=False)), - ('c_long', LongGen(nullable=False)), - ('c_time', DateGen(nullable=False)), - ('c_string', StringGen(nullable=False)), - ('c_float', FloatGen(nullable=False)), - ('c_decimal', DecimalGen(nullable=False, precision=8, scale=3)), - ('c_struct', StructGen(nullable=False, children = [ - ['child_int', IntegerGen()], - ['child_time', DateGen()], - ['child_string', StringGen()], - ['child_decimal', DecimalGen(nullable=False, precision=8, scale=3)]]))] +def _gen_data_for_collect(nullable=True): + return [ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', IntegerGen()), + ('c_int', IntegerGen(nullable=nullable)), + ('c_long', LongGen(nullable=nullable)), + ('c_time', DateGen(nullable=nullable)), + ('c_string', StringGen(nullable=nullable)), + ('c_float', FloatGen(nullable=nullable)), + ('c_decimal', DecimalGen(nullable=nullable, precision=8, scale=3)), + ('c_struct', StructGen(nullable=nullable, children=[ + ['child_int', IntegerGen()], + ['child_time', DateGen()], + ['child_string', StringGen()], + ['child_decimal', DecimalGen(precision=8, scale=3)]]))] + + +_collect_sql_string =\ + ''' + select + collect_list(c_int) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_int, + collect_list(c_long) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_long, + collect_list(c_time) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_time, + collect_list(c_string) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_string, + collect_list(c_float) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_float, + collect_list(c_decimal) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_decimal, + collect_list(c_struct) over + (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_struct + from window_collect_table + ''' # SortExec does not support array type, so sort the result locally. @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [collect_data_gen], ids=idfn) -def test_window_aggs_for_rows_collect_list(data_gen): +@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/1638") +def test_window_aggs_for_rows_collect_list(): assert_gpu_and_cpu_are_equal_sql( - lambda spark : gen_df(spark, data_gen, length=2048), + lambda spark : gen_df(spark, _gen_data_for_collect(), length=2048), "window_collect_table", - 'select ' - ' collect_list(c_int) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_int, ' - ' collect_list(c_long) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_long, ' - ' collect_list(c_time) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_time, ' - ' collect_list(c_string) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_string, ' - ' collect_list(c_float) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_float, ' - ' collect_list(c_decimal) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_decimal, ' - ' collect_list(c_struct) over ' - ' (partition by a order by b,c_int rows between UNBOUNDED preceding and CURRENT ROW) as collect_struct ' - 'from window_collect_table ', + _collect_sql_string, {'spark.rapids.sql.expression.CollectList': 'true'}) + + +''' + Spark will drop nulls when collecting, but seems GPU does not yet, so exceptions come up. + Now set nullable to false to verify the current functionality without null values. + Once native supports dropping nulls, will enable the tests above and remove this one. +''' +# SortExec does not support array type, so sort the result locally. +@ignore_order(local=True) +def test_window_aggs_for_rows_collect_list_no_nulls(): + assert_gpu_and_cpu_are_equal_sql( + lambda spark : gen_df(spark, _gen_data_for_collect(False), length=2048), + "window_collect_table", + _collect_sql_string, + {'spark.rapids.sql.expression.CollectList': 'true'}) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 5805cd4388c..564a1c469bb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -205,6 +205,9 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow GpuColumnVector.from(aggColumn, windowFunc.dataType) case _ => val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) + // The API 'castTo' will take care of the 'from' type and 'to' type, and + // just increase the reference count by one when they are the same. + // so it is OK to always call it here. withResource(aggColumn) { aggColumn => GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) } @@ -238,6 +241,9 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow GpuColumnVector.from(aggColumn, windowFunc.dataType) case _ => val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) + // The API 'castTo' will take care of the 'from' type and 'to' type, and + // just increase the reference count by one when they are the same. + // so it is OK to always call it here. withResource(aggColumn) { aggColumn => GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 819a7b780c2..4a98c089e33 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -1006,8 +1006,21 @@ object ExprChecks { outputCheck: TypeSig, sparkOutputSig: TypeSig, paramCheck: Seq[ParamCheck] = Seq.empty, - repeatingParamCheck: Option[RepeatingParamCheck] = None): ExprChecks = - windowOnly(outputCheck, sparkOutputSig, paramCheck, repeatingParamCheck) + repeatingParamCheck: Option[RepeatingParamCheck] = None): ExprChecks = { + val notWindowParamCheck = paramCheck.map { pc => + ParamCheck(pc.name, TypeSig.none, pc.spark) + } + val notWindowRepeat = repeatingParamCheck.map { pc => + RepeatingParamCheck(pc.name, TypeSig.none, pc.spark) + } + ExprChecksImpl(Map( + (GroupByAggExprContext, + ContextChecks(TypeSig.none, sparkOutputSig, notWindowParamCheck, notWindowRepeat)), + (ReductionAggExprContext, + ContextChecks(TypeSig.none, sparkOutputSig, notWindowParamCheck, notWindowRepeat)), + (WindowAggExprContext, + ContextChecks(outputCheck, sparkOutputSig, paramCheck, repeatingParamCheck)))) + } } /** diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index fc61b2213ea..4d55c3cf848 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -533,7 +533,8 @@ abstract class GpuLastBase(child: Expression) /** * Collects and returns a list of non-unique elements. * - * FIXME Not sure whether GPU version requires the two offset parameters. Keep it here first. + * The two 'offset' parameters are not used by GPU version, but are here for the compatibility + * with the CPU version and automated checks. */ case class GpuCollectList(child: Expression, mutableAggBufferOffset: Int = 0, @@ -561,10 +562,11 @@ case class GpuCollectList(child: Expression, // The members as below should NOT be used yet, ensured by the // "TypeCheck.aggNotGroupByOrReduction" when trying to override the expression. private lazy val cudfList = AttributeReference("collect_list", dataType)() - override val initialValues: Seq[GpuExpression] = Seq.empty - override val updateExpressions: Seq[Expression] = Seq.empty - override val mergeExpressions: Seq[GpuExpression] = Seq.empty - override val evaluateExpression: Expression = null + // Make them lazy to avoid being initialized when creating a GpuCollectList. + override lazy val initialValues: Seq[GpuExpression] = throw new UnsupportedOperationException + override lazy val updateExpressions: Seq[Expression] = throw new UnsupportedOperationException + override lazy val mergeExpressions: Seq[GpuExpression] = throw new UnsupportedOperationException + override lazy val evaluateExpression: Expression = throw new UnsupportedOperationException override val inputProjection: Seq[Expression] = Seq(child) override def aggBufferAttributes: Seq[AttributeReference] = cudfList :: Nil } From d8c21216d8cdeefccab5983c0fce3cf39ed95667 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 1 Feb 2021 14:53:53 +0800 Subject: [PATCH 11/12] Add an empty line at the end Signed-off-by: Firestarman --- integration_tests/src/main/python/window_function_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index e53d08169ca..2743e18f194 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -275,4 +275,4 @@ def test_window_aggs_for_rows_collect_list_no_nulls(): lambda spark : gen_df(spark, _gen_data_for_collect(False), length=2048), "window_collect_table", _collect_sql_string, - {'spark.rapids.sql.expression.CollectList': 'true'}) \ No newline at end of file + {'spark.rapids.sql.expression.CollectList': 'true'}) From 3a5f94e2735be7685f314c606a63dcd7ad838be7 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 5 Feb 2021 09:28:04 +0800 Subject: [PATCH 12/12] Addressed some new comments again Signed-off-by: Firestarman --- .../src/main/python/window_function_test.py | 2 +- .../nvidia/spark/rapids/GpuOverrides.scala | 26 +++++++++++-------- .../spark/sql/rapids/AggregateFunctions.scala | 2 ++ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index d227fa259fd..129548b6c99 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -264,7 +264,7 @@ def _gen_data_for_collect(nullable=True): # SortExec does not support array type, so sort the result locally. @ignore_order(local=True) -@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/1638") +@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1638") def test_window_aggs_for_rows_collect_list(): assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, _gen_data_for_collect(), length=2048), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 14893632aa5..e5df922e22a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -774,11 +774,12 @@ object GpuOverrides { "Calculates a return value for every input row of a table based on a group (or " + "\"window\") of rows", ExprChecks.windowOnly( - (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested + TypeSig.ARRAY.nested(TypeSig.STRUCT), + TypeSig.commonCudfTypes + TypeSig.DECIMAL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all, Seq(ParamCheck("windowFunction", - (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested + - TypeSig.ARRAY.nested(TypeSig.STRUCT), + TypeSig.commonCudfTypes + TypeSig.DECIMAL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all), ParamCheck("windowSpec", TypeSig.CALENDAR + TypeSig.NULL + TypeSig.integral + TypeSig.DECIMAL, @@ -1683,13 +1684,13 @@ object GpuOverrides { expr[AggregateExpression]( "Aggregate expression", ExprChecks.fullAgg( - (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested() + TypeSig.NULL + - TypeSig.ARRAY.nested(TypeSig.STRUCT), + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all, Seq(ParamCheck( "aggFunc", - (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested() + TypeSig.NULL + - TypeSig.ARRAY.nested(TypeSig.STRUCT), + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all)), Some(RepeatingParamCheck("filter", TypeSig.BOOLEAN, TypeSig.BOOLEAN))), (a, conf, p, r) => new ExprMeta[AggregateExpression](a, conf, p, r) { @@ -2267,13 +2268,14 @@ object GpuOverrides { }), expr[CollectList]( "Collect a list of elements, now only supported by windowing.", - /* It should be 'fullAgg' eventually but now only support windowing, - so 'aggNotGroupByOrReduction' */ + // It should be 'fullAgg' eventually but now only support windowing, + // so 'aggNotGroupByOrReduction' ExprChecks.aggNotGroupByOrReduction( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", - (TypeSig.commonCudfTypes + TypeSig.DECIMAL).nested() + TypeSig.STRUCT, + TypeSig.commonCudfTypes + TypeSig.DECIMAL + + TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL), TypeSig.all))), (c, conf, p, r) => new ExprMeta[CollectList](c, conf, p, r) { override def convertToGpu(): GpuExpression = GpuCollectList( @@ -2621,7 +2623,9 @@ object GpuOverrides { exec[WindowExec]( "Window-operator backend", ExecChecks( - (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested() + TypeSig.ARRAY, + TypeSig.commonCudfTypes + TypeSig.DECIMAL + + TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL) + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT), TypeSig.all), (windowOp, conf, p, r) => new GpuWindowExecMeta(windowOp, conf, p, r) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 8db074f32ae..32da9614133 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -548,6 +548,8 @@ case class GpuCollectList(child: Expression, def this(child: Expression) = this(child, 0, 0) + // Both `CollectList` and `CollectSet` are non-deterministic since their results depend on the + // actual order of input rows. override lazy val deterministic: Boolean = false override def nullable: Boolean = false