From f291564b7176d4dee86684cc2dd3d226a0e18a81 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Wed, 26 Aug 2020 15:57:53 +0000 Subject: [PATCH] Incorrect output from averages with filters in partial only mode Signed-off-by: Kuhu Shukla --- .../src/main/python/hash_aggregate_test.py | 19 +-------- .../spark/sql/rapids/AggregateFunctions.scala | 17 ++++++-- .../spark/rapids/HashAggregatesSuite.scala | 41 +++++++++++++++++++ .../rapids/SparkQueryCompareTestSuite.scala | 8 ++++ 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index e0864a77377..b50d12c85af 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -280,28 +280,11 @@ def test_hash_multiple_filters(data_gen, conf): "hash_agg_table", 'select count(a) filter (where c > 50),' + 'count(b) filter (where c > 100),' + - # Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed - # 'avg(b) filter (where b > 20),' + + 'avg(b) filter (where b > 20),' + 'min(a), max(b) filter (where c > 250) from hash_agg_table group by a', conf) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/155') -@ignore_order -@allow_non_gpu( - 'HashAggregateExec', 'AggregateExpression', - 'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast', - 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If', - 'EqualTo', 'First', 'SortAggregateExec', 'Coalesce') -@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn) -def test_hash_multiple_filters_fail(data_gen): - assert_gpu_and_cpu_are_equal_sql( - lambda spark : gen_df(spark, data_gen, length=100), - "hash_agg_table", - 'select avg(b) filter (where b > 20) from hash_agg_table group by a', - _no_nans_float_conf_partial) - - @ignore_order @allow_non_gpu('HashAggregateExec', 'AggregateExpression', 'AttributeReference', 'Alias', 'Max', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero') diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index d65256b764d..3eed37be9fe 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -20,7 +20,7 @@ import ai.rapids.cudf import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExprId, ImplicitCastInputTypes, Literal} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExprId, ImplicitCastInputTypes} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, Complete, Final, Partial, PartialMerge} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BooleanType, DataType, DoubleType, LongType, NumericType, StructType} @@ -76,8 +76,19 @@ case class GpuAggregateExpression(origAggregateFunction: GpuAggregateFunction, case class WrappedAggFunction(aggregateFunction: GpuAggregateFunction, filter: Expression) extends GpuDeclarativeAggregate { override val inputProjection: Seq[GpuExpression] = { - val caseWhenExpressions = aggregateFunction.inputProjection.map {ip => - GpuCaseWhen(Seq((filter, ip))) + val caseWhenExpressions = aggregateFunction.inputProjection.map { ip => + // special case average with null result from the filter as expected values should be + // (0.0,0) for (sum, count) + val initialValue: Expression = + origAggregateFunction match { + case _ : GpuAverage => ip.dataType match { + case doubleType: DoubleType => GpuLiteral(0D, doubleType) + case _ : LongType => GpuLiteral(0L, LongType) + } + case _ => GpuLiteral(null, ip.dataType) + } + val filterConditional = GpuCaseWhen(Seq((filter, ip))) + GpuCaseWhen(Seq((GpuIsNotNull(filterConditional), filterConditional)), Some(initialValue)) } caseWhenExpressions } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala index e9bfe114b73..35baef7dd18 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala @@ -1635,4 +1635,45 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy(col("double")).agg(sum(col("int"))) } + + testSparkResultsAreEqual("Agg expression with filter avg with nulls", nullDf, execsAllowedNonGpu = + Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Average", + "Count", "Cast"), + conf = partialOnlyConf, repart = 2) { + frame => frame.createOrReplaceTempView("testTable") + frame.sparkSession.sql( + s""" + | SELECT + | avg(more_longs) filter (where more_longs > 2) + | FROM testTable + | group by longs + |""".stripMargin) + } + + testSparkResultsAreEqual("Agg expression with filter count with nulls", + nullDf, execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", + "AttributeReference", "Alias", "Count", "Cast"), + conf = partialOnlyConf, repart = 2) { + frame => frame.createOrReplaceTempView("testTable") + frame.sparkSession.sql( + s""" + | SELECT + | count(more_longs) filter (where more_longs > 2) + | FROM testTable + | group by longs + |""".stripMargin) + } + + testSparkResultsAreEqual("Agg expression with filter sum with nulls", nullDf, execsAllowedNonGpu = + Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Sum", "Cast"), + conf = partialOnlyConf, repart = 2) { + frame => frame.createOrReplaceTempView("testTable") + frame.sparkSession.sql( + s""" + | SELECT + | sum(more_longs) filter (where more_longs > 2) + | FROM testTable + | group by longs + |""".stripMargin) + } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index eb9104fa8cf..101e8938ab5 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -1234,6 +1234,14 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { ).toDF("doubles") } + def nullDf(session: SparkSession): DataFrame = { + import session.sqlContext.implicits._ + Seq[(java.lang.Long, java.lang.Long)]( + (100L, 15L), + (100L, null) + ).toDF("longs", "more_longs") + } + def mixedDoubleDf(session: SparkSession): DataFrame = { import session.sqlContext.implicits._ Seq[(java.lang.Double, java.lang.Double)](