From 06784a462bb6c551af38e5f22b3e2cb7156541cb Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 16 Apr 2021 09:18:41 -0500 Subject: [PATCH] Fix divide-by-zero in GpuAverage with ansi mode (#2130) Signed-off-by: Alessandro Bellina --- .../src/main/python/hash_aggregate_test.py | 25 ++++++++++++++++++- .../spark/sql/rapids/AggregateFunctions.scala | 6 ++++- .../apache/spark/sql/rapids/arithmetic.scala | 12 ++++++--- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index d554f769adb..47963b4fc4b 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -204,6 +204,30 @@ def test_hash_grpby_avg(data_gen, conf): conf=conf ) +@ignore_order +@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn) +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +@pytest.mark.parametrize('ansi_enabled', ['true', 'false']) +def test_hash_grpby_avg_nulls(data_gen, conf, ansi_enabled): + conf.update({'spark.sql.ansi.enabled': ansi_enabled}) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: gen_df(spark, data_gen, length=100).groupby('a') + .agg(f.avg('c')), + conf=conf + ) + +@ignore_order +@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn) +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +@pytest.mark.parametrize('ansi_enabled', ['true', 'false']) +def test_hash_reduction_avg_nulls(data_gen, conf, ansi_enabled): + conf.update({'spark.sql.ansi.enabled': ansi_enabled}) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: gen_df(spark, data_gen, length=100) + .agg(f.avg('c')), + conf=conf + ) + # tracks https://github.com/NVIDIA/spark-rapids/issues/154 @approximate_float @ignore_order @@ -302,7 +326,6 @@ def test_hash_query_max_with_multiple_distincts(data_gen, conf, parameterless): 'count(distinct b) from hash_agg_table group by a', conf) - @ignore_order @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 6f6e55322ee..c6e6e5b9ce9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -442,9 +442,13 @@ case class GpuAverage(child: Expression) extends GpuDeclarativeAggregate // average = (0 + 1) and not 2 which is the rowcount of the projected column. override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum), new CudfSum(cudfCount)) + + // NOTE: this sets `failOnErrorOverride=false` in `GpuDivide` to force it not to throw + // divide-by-zero exceptions, even when ansi mode is enabled in Spark. + // This is to conform with Spark's behavior in the Average aggregate function. override lazy val evaluateExpression: GpuExpression = GpuDivide( GpuCast(cudfSum, DoubleType), - GpuCast(cudfCount, DoubleType)) + GpuCast(cudfCount, DoubleType), failOnErrorOverride = false) override lazy val initialValues: Seq[GpuLiteral] = Seq( GpuLiteral(0.0, DoubleType), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index b914b9378c2..931d219630b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -269,7 +269,8 @@ object GpuDivModLike { } trait GpuDivModLike extends CudfBinaryArithmetic { - lazy val failOnError: Boolean = ShimLoader.getSparkShims.shouldFailDivByZero() + lazy val failOnError: Boolean = + ShimLoader.getSparkShims.shouldFailDivByZero() override def nullable: Boolean = true @@ -330,13 +331,18 @@ object GpuDivideUtil { } // This is for doubles and floats... -case class GpuDivide(left: Expression, right: Expression) extends GpuDivModLike { +case class GpuDivide(left: Expression, right: Expression, + failOnErrorOverride: Boolean = ShimLoader.getSparkShims.shouldFailDivByZero()) + extends GpuDivModLike { + + override lazy val failOnError: Boolean = failOnErrorOverride + override def inputType: AbstractDataType = TypeCollection(DoubleType, DecimalType) override def symbol: String = "/" override def binaryOp: BinaryOp = (left.dataType, right.dataType) match { - case (_: DecimalType, _: DecimalType) => BinaryOp.DIV + case (_: DecimalType, _: DecimalType) => BinaryOp.DIV case _ => BinaryOp.TRUE_DIV }