Skip to content

Commit

Permalink
Support legacy behavior of parameterless count (NVIDIA#1958)
Browse files Browse the repository at this point in the history
* Return a Long col with 0 if agg is empty

Signed-off-by: Raza Jafri <[email protected]>

* addressed review comments

Signed-off-by: Raza Jafri <[email protected]>

* improved tests

Signed-off-by: Raza Jafri <[email protected]>

* added two counts

Signed-off-by: Raza Jafri <[email protected]>

* added comment

Signed-off-by: Raza Jafri <[email protected]>

Co-authored-by: Raza Jafri <[email protected]>
  • Loading branch information
razajafri and razajafri authored Mar 23, 2021
1 parent 80311bf commit 48bb24d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
42 changes: 35 additions & 7 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,11 @@ def test_hash_multiple_mode_query_avg_distincts(data_gen, conf):
@ignore_order
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs),
ids=idfn)
def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf):
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail(
condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))])
def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf, parameterless):
conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless})
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, data_gen, length=100),
"hash_agg_table",
Expand All @@ -274,6 +276,7 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf):
'sum(distinct a),' +
'count(distinct b),' +
'count(a),' +
'count(),' +
'sum(a),' +
'min(a),'+
'max(a) from hash_agg_table group by a',
Expand All @@ -286,12 +289,16 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf):
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs),
ids=idfn)
def test_hash_query_max_with_multiple_distincts(data_gen, conf):
@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail(
condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))])
def test_hash_query_max_with_multiple_distincts(data_gen, conf, parameterless):
conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless})
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, data_gen, length=100),
"hash_agg_table",
'select max(c),' +
'sum(distinct a),' +
'count(),' +
'count(distinct b) from hash_agg_table group by a',
conf)

Expand Down Expand Up @@ -336,12 +343,16 @@ def test_hash_query_max_bug(data_gen):
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys,
_grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn)
def test_hash_agg_with_nan_keys(data_gen):
@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail(
condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))])
def test_hash_agg_with_nan_keys(data_gen, parameterless):
_no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless})
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, data_gen, length=1024),
"hash_agg_table",
'select a, '
'count(*) as count_stars, '
'count(*) as count_stars, '
'count() as count_parameterless, '
'count(b) as count_bees, '
'sum(b) as sum_of_bees, '
'max(c) as max_seas, '
Expand Down Expand Up @@ -380,7 +391,10 @@ def test_count_distinct_with_nan_floats(data_gen):


@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn)
def test_generic_reductions(data_gen):
@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail(
condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))])
def test_generic_reductions(data_gen, parameterless):
_no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless})
assert_gpu_and_cpu_are_equal_collect(
# Coalesce and sort are to make sure that first and last, which are non-deterministic
# become deterministic
Expand All @@ -391,9 +405,23 @@ def test_generic_reductions(data_gen):
'first(a)',
'last(a)',
'count(a)',
'count()',
'count(1)'),
conf = _no_nans_float_conf)

@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn)
@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail(
condition=not is_before_spark_310(), reason="parameterless count not supported by default in Spark 3.1+"))])
def test_count(data_gen, parameterless):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.selectExpr(
'count(a)',
'count()',
'count()',
'count(1)'),
conf = {'spark.sql.legacy.allowParameterlessCount': parameterless})

@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn)
def test_distinct_count_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
17 changes: 14 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf.NvtxColor
import ai.rapids.cudf.{NvtxColor, Scalar}
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._

Expand All @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, MapType, StructType}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, MapType, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object AggregateUtils {
Expand Down Expand Up @@ -857,7 +857,7 @@ case class GpuHashAggregateExec(
// reduction merge or update aggregates functions are
val cvs = ArrayBuffer[GpuColumnVector]()
aggModeCudfAggregates.foreach { case (mode, aggs) =>
aggs.foreach {agg =>
aggs.foreach { agg =>
val aggFn = if ((mode == Partial || mode == Complete) && !merge) {
agg.updateReductionAggregate
} else {
Expand All @@ -871,6 +871,17 @@ case class GpuHashAggregateExec(
}
}
}
// If cvs is empty, we add a single row with zero value. The value in the row is
// meaningless as it doesn't matter what we put in it. The projection will add a zero
// column to the result set in case of a parameter-less count.
// This is to fix a bug in the plugin where a paramater-less count wasn't returning the
// desired result compared to Spark-CPU.
// For more details go to https://github.com/NVIDIA/spark-rapids/issues/1737
if (cvs.isEmpty) {
withResource(Scalar.fromLong(0L)) { ZERO =>
cvs += GpuColumnVector.from(cudf.ColumnVector.fromScalar(ZERO, 1), LongType)
}
}
new ColumnarBatch(cvs.toArray, cvs.head.getBase.getRowCount.toInt)
}
} finally {
Expand Down

0 comments on commit 48bb24d

Please sign in to comment.