From 84f66a41bb761cd99066e1b67c0e55b7f6ac6029 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 17 Mar 2021 14:17:31 -0700 Subject: [PATCH 1/5] Return a Long col with 0 if agg is empty Signed-off-by: Raza Jafri --- .../com/nvidia/spark/rapids/aggregate.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index b3b0fda3500..ee4427730b6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf -import ai.rapids.cudf.NvtxColor +import ai.rapids.cudf.{DType, NvtxColor, Scalar} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, MapType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -857,7 +857,7 @@ case class GpuHashAggregateExec( // reduction merge or update aggregates functions are val cvs = ArrayBuffer[GpuColumnVector]() aggModeCudfAggregates.foreach { case (mode, aggs) => - aggs.foreach {agg => + aggs.foreach { agg => val aggFn = if ((mode == Partial || mode == Complete) && !merge) { agg.updateReductionAggregate } else { @@ -871,6 +871,17 @@ case class GpuHashAggregateExec( } } } + // If cvs is empty, we add a single row with zero value. + // This is to fix a bug in the plugin where a paramater-less count wasn't returning the + // desired result compared to Spark-CPU. + // For more details go to https://github.com/NVIDIA/spark-rapids/issues/1737 + if (cvs.isEmpty) { + withResource(Scalar.fromInt(0)) { ZERO => + withResource(cudf.ColumnVector.fromScalar(ZERO, 1)) { cv => + cvs += GpuColumnVector.from(cv.castTo(DType.INT64), LongType) + } + } + } new ColumnarBatch(cvs.toArray, cvs.head.getBase.getRowCount.toInt) } } finally { From 812ec2a85ca484ff6df89cad4c53112365910170 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 18 Mar 2021 13:00:35 -0700 Subject: [PATCH 2/5] addressed review comments Signed-off-by: Raza Jafri --- integration_tests/src/main/python/hash_aggregate_test.py | 6 +++++- .../src/main/scala/com/nvidia/spark/rapids/aggregate.scala | 6 ++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index fd8deda76e9..7a7d1d3e7aa 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -274,6 +274,7 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): 'sum(distinct a),' + 'count(distinct b),' + 'count(a),' + + 'count(),' + 'sum(a),' + 'min(a),'+ 'max(a) from hash_agg_table group by a', @@ -292,6 +293,7 @@ def test_hash_query_max_with_multiple_distincts(data_gen, conf): "hash_agg_table", 'select max(c),' + 'sum(distinct a),' + + 'count(),' + 'count(distinct b) from hash_agg_table group by a', conf) @@ -341,7 +343,8 @@ def test_hash_agg_with_nan_keys(data_gen): lambda spark : gen_df(spark, data_gen, length=1024), "hash_agg_table", 'select a, ' - 'count(*) as count_stars, ' + 'count(*) as count_stars, ' + 'count() as count_paramaterless, ' 'count(b) as count_bees, ' 'sum(b) as sum_of_bees, ' 'max(c) as max_seas, ' @@ -391,6 +394,7 @@ def test_generic_reductions(data_gen): 'first(a)', 'last(a)', 'count(a)', + 'count()', 'count(1)'), conf = _no_nans_float_conf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index ee4427730b6..3ab17c1cbae 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -876,10 +876,8 @@ case class GpuHashAggregateExec( // desired result compared to Spark-CPU. // For more details go to https://github.com/NVIDIA/spark-rapids/issues/1737 if (cvs.isEmpty) { - withResource(Scalar.fromInt(0)) { ZERO => - withResource(cudf.ColumnVector.fromScalar(ZERO, 1)) { cv => - cvs += GpuColumnVector.from(cv.castTo(DType.INT64), LongType) - } + withResource(Scalar.fromLong(0L)) { ZERO => + cvs += GpuColumnVector.from(cudf.ColumnVector.fromScalar(ZERO, 1), LongType) } } new ColumnarBatch(cvs.toArray, cvs.head.getBase.getRowCount.toInt) From 6f9774b1c31467bf5cbbb89e8fc4578ed9d4ae82 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 23 Mar 2021 11:11:13 -0700 Subject: [PATCH 3/5] improved tests Signed-off-by: Raza Jafri --- .../src/main/python/hash_aggregate_test.py | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 7a7d1d3e7aa..d418b951020 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -262,9 +262,11 @@ def test_hash_multiple_mode_query_avg_distincts(data_gen, conf): @ignore_order @incompat @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) -@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), - ids=idfn) -def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf, parameterless): + conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=100), "hash_agg_table", @@ -287,7 +289,10 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) -def test_hash_query_max_with_multiple_distincts(data_gen, conf): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_query_max_with_multiple_distincts(data_gen, conf, parameterless): + conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=100), "hash_agg_table", @@ -338,13 +343,16 @@ def test_hash_query_max_bug(data_gen): @ignore_order @pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys, _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn) -def test_hash_agg_with_nan_keys(data_gen): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_agg_with_nan_keys(data_gen, parameterless): + _no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=1024), "hash_agg_table", 'select a, ' 'count(*) as count_stars, ' - 'count() as count_paramaterless, ' + 'count() as count_parameterless, ' 'count(b) as count_bees, ' 'sum(b) as sum_of_bees, ' 'max(c) as max_seas, ' @@ -383,7 +391,10 @@ def test_count_distinct_with_nan_floats(data_gen): @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) -def test_generic_reductions(data_gen): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_generic_reductions(data_gen, parameterless): + _no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_collect( # Coalesce and sort are to make sure that first and last, which are non-deterministic # become deterministic @@ -398,6 +409,18 @@ def test_generic_reductions(data_gen): 'count(1)'), conf = _no_nans_float_conf) +@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_count(data_gen, parameterless): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen) \ + .selectExpr( + 'count(a)', + 'count()', + 'count(1)'), + conf = {'spark.sql.legacy.allowParameterlessCount': parameterless}) + @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) def test_distinct_count_reductions(data_gen): assert_gpu_and_cpu_are_equal_collect( From 41cabb9808b707d9c5f9bd9835f62d9d3bbb6fee Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 23 Mar 2021 12:07:13 -0700 Subject: [PATCH 4/5] added two counts Signed-off-by: Raza Jafri --- integration_tests/src/main/python/hash_aggregate_test.py | 1 + .../src/main/scala/com/nvidia/spark/rapids/aggregate.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index d418b951020..42026ad7e68 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -418,6 +418,7 @@ def test_count(data_gen, parameterless): .selectExpr( 'count(a)', 'count()', + 'count()', 'count(1)'), conf = {'spark.sql.legacy.allowParameterlessCount': parameterless}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 3ab17c1cbae..a8a6f73b468 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf -import ai.rapids.cudf.{DType, NvtxColor, Scalar} +import ai.rapids.cudf.{NvtxColor, Scalar} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ From c40aa7a233951979d8335801a76bfc7dde76d447 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 23 Mar 2021 13:17:56 -0700 Subject: [PATCH 5/5] added comment Signed-off-by: Raza Jafri --- .../src/main/scala/com/nvidia/spark/rapids/aggregate.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index a8a6f73b468..7553604c58d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -871,7 +871,9 @@ case class GpuHashAggregateExec( } } } - // If cvs is empty, we add a single row with zero value. + // If cvs is empty, we add a single row with zero value. The value in the row is + // meaningless as it doesn't matter what we put in it. The projection will add a zero + // column to the result set in case of a parameter-less count. // This is to fix a bug in the plugin where a paramater-less count wasn't returning the // desired result compared to Spark-CPU. // For more details go to https://github.com/NVIDIA/spark-rapids/issues/1737