From 48bb24dade2a401f88fb0ca3ce8229613e0ef6d7 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 23 Mar 2021 15:40:10 -0700 Subject: [PATCH] Support legacy behavior of parameterless count (#1958) * Return a Long col with 0 if agg is empty Signed-off-by: Raza Jafri * addressed review comments Signed-off-by: Raza Jafri * improved tests Signed-off-by: Raza Jafri * added two counts Signed-off-by: Raza Jafri * added comment Signed-off-by: Raza Jafri Co-authored-by: Raza Jafri --- .../src/main/python/hash_aggregate_test.py | 42 +++++++++++++++---- .../com/nvidia/spark/rapids/aggregate.scala | 17 ++++++-- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 38896de8258..cc6ec4b490d 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -262,9 +262,11 @@ def test_hash_multiple_mode_query_avg_distincts(data_gen, conf): @ignore_order @incompat @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) -@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), - ids=idfn) -def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf, parameterless): + conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=100), "hash_agg_table", @@ -274,6 +276,7 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): 'sum(distinct a),' + 'count(distinct b),' + 'count(a),' + + 'count(),' + 'sum(a),' + 'min(a),'+ 'max(a) from hash_agg_table group by a', @@ -286,12 +289,16 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) -def test_hash_query_max_with_multiple_distincts(data_gen, conf): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_query_max_with_multiple_distincts(data_gen, conf, parameterless): + conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=100), "hash_agg_table", 'select max(c),' + 'sum(distinct a),' + + 'count(),' + 'count(distinct b) from hash_agg_table group by a', conf) @@ -336,12 +343,16 @@ def test_hash_query_max_bug(data_gen): @ignore_order @pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys, _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn) -def test_hash_agg_with_nan_keys(data_gen): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_agg_with_nan_keys(data_gen, parameterless): + _no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=1024), "hash_agg_table", 'select a, ' - 'count(*) as count_stars, ' + 'count(*) as count_stars, ' + 'count() as count_parameterless, ' 'count(b) as count_bees, ' 'sum(b) as sum_of_bees, ' 'max(c) as max_seas, ' @@ -380,7 +391,10 @@ def test_count_distinct_with_nan_floats(data_gen): @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) -def test_generic_reductions(data_gen): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_generic_reductions(data_gen, parameterless): + _no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_collect( # Coalesce and sort are to make sure that first and last, which are non-deterministic # become deterministic @@ -391,9 +405,23 @@ def test_generic_reductions(data_gen): 'first(a)', 'last(a)', 'count(a)', + 'count()', 'count(1)'), conf = _no_nans_float_conf) +@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_count(data_gen, parameterless): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen) \ + .selectExpr( + 'count(a)', + 'count()', + 'count()', + 'count(1)'), + conf = {'spark.sql.legacy.allowParameterlessCount': parameterless}) + @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) def test_distinct_count_reductions(data_gen): assert_gpu_and_cpu_are_equal_collect( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index b3b0fda3500..7553604c58d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf -import ai.rapids.cudf.NvtxColor +import ai.rapids.cudf.{NvtxColor, Scalar} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, MapType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -857,7 +857,7 @@ case class GpuHashAggregateExec( // reduction merge or update aggregates functions are val cvs = ArrayBuffer[GpuColumnVector]() aggModeCudfAggregates.foreach { case (mode, aggs) => - aggs.foreach {agg => + aggs.foreach { agg => val aggFn = if ((mode == Partial || mode == Complete) && !merge) { agg.updateReductionAggregate } else { @@ -871,6 +871,17 @@ case class GpuHashAggregateExec( } } } + // If cvs is empty, we add a single row with zero value. The value in the row is + // meaningless as it doesn't matter what we put in it. The projection will add a zero + // column to the result set in case of a parameter-less count. + // This is to fix a bug in the plugin where a paramater-less count wasn't returning the + // desired result compared to Spark-CPU. + // For more details go to https://github.com/NVIDIA/spark-rapids/issues/1737 + if (cvs.isEmpty) { + withResource(Scalar.fromLong(0L)) { ZERO => + cvs += GpuColumnVector.from(cudf.ColumnVector.fromScalar(ZERO, 1), LongType) + } + } new ColumnarBatch(cvs.toArray, cvs.head.getBase.getRowCount.toInt) } } finally {