Skip to content

Commit

Permalink
ANSI check for aggregates (#3597)
Browse files Browse the repository at this point in the history
* ANSI fallbacks for most aggregates

* Make use of properties in the aggregates to control fallback

* Make aggregate metas all subclass from AggExprMeta

* Minor cleanup

Signed-off-by: Alessandro Bellina <[email protected]>

* Split up test_hash avg_nulls with and without ansi enabled
  • Loading branch information
abellina authored Sep 22, 2021
1 parent 6eb74c2 commit fd4c9bc
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 56 deletions.
127 changes: 104 additions & 23 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@
_grpkey_floats_with_nulls_and_nans]


# Used to test ANSI-mode fallback
_no_overflow_ansi_gens = [
ByteGen(min_val = 1, max_val = 10, special_cases=[]),
ShortGen(min_val = 1, max_val = 100, special_cases=[]),
IntegerGen(min_val = 1, max_val = 1000, special_cases=[]),
LongGen(min_val = 1, max_val = 3000, special_cases=[])]


def get_params(init_list, marked_params=[]):
"""
A method to build the test inputs along with their passed in markers to allow testing
Expand Down Expand Up @@ -259,29 +267,6 @@ def test_hash_grpby_avg(data_gen, conf):
conf=conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('ansi_enabled', ['true', 'false'])
def test_hash_grpby_avg_nulls(data_gen, conf, ansi_enabled):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
conf=local_conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('ansi_enabled', ['true', 'false'])
def test_hash_reduction_avg_nulls(data_gen, conf, ansi_enabled):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
conf=local_conf
)

# tracks https://github.com/NVIDIA/spark-rapids/issues/154
@approximate_float
Expand Down Expand Up @@ -1088,3 +1073,99 @@ def do_it(spark):
df = two_col_df(spark, StringGen('k{1,5}'), ArrayGen(MapGen(StringGen('a{1,5}', nullable=False), StringGen('[ab]{1,5}'))))
return df.groupBy('a').agg(f.min(df.b[1]["a"]))
assert_gpu_and_cpu_are_equal_collect(do_it)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg_nulls(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
conf=conf
)

@ignore_order
@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Average')
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg_nulls_ansi(data_gen, conf):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_fallback_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
'Average',
conf=local_conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_reduction_avg_nulls(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
conf=conf
)

@ignore_order
@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Average')
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_reduction_avg_nulls_ansi(data_gen, conf):
local_conf = copy_and_update(conf, {'spark.sql.ansi.enabled': 'true'})
assert_gpu_fallback_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
'Average',
conf=local_conf
)


@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Sum')
@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_sum_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
return df.groupBy('a').agg(f.sum("b"))

assert_gpu_fallback_collect(do_it, 'Sum',
conf={'spark.sql.ansi.enabled': 'true'})


@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression', 'Cast',
'HashPartitioning', 'ShuffleExchangeExec', 'Average')
@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_avg_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
return df.groupBy('a').agg(f.avg("b"))

assert_gpu_fallback_collect(do_it, 'Average',
conf={'spark.sql.ansi.enabled': 'true'})


@allow_non_gpu('HashAggregateExec', 'Alias', 'AggregateExpression',
'HashPartitioning', 'ShuffleExchangeExec', 'Count', 'Literal')
@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_count_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
return df.groupBy('a').agg(f.count("b"), f.count("*"))

assert_gpu_fallback_collect(do_it, 'Count',
conf={'spark.sql.ansi.enabled': 'true'})


@pytest.mark.parametrize('data_gen', _no_overflow_ansi_gens, ids=idfn)
def test_no_fallback_when_ansi_enabled(data_gen):
def do_it(spark):
df = gen_df(spark, [('a', data_gen), ('b', data_gen)], length=100)
# coalescing because of first/last are not deterministic
df = df.coalesce(1).orderBy("a", "b")
return df.groupBy('a').agg(f.first("b"), f.last("b"), f.min("b"), f.max("b"))

assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.sql.ansi.enabled': 'true'})
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,16 @@ class Spark320Shims extends Spark32XShims {
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp + TypeSig.NULL,
TypeSig.numericAndInterval + TypeSig.NULL))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,16 @@ abstract class SparkBaseShims extends Spark30XShims {
TypeSig.DOUBLE, TypeSig.DOUBLE + TypeSig.DECIMAL_128_FULL,
Seq(ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.numeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
override def tagAggForGpu(): Unit = {
val dataType = a.child.dataType
GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuAverage(childExprs.head)

// Average is not supported in ANSI mode right now, no matter the type
override val ansiTypeToCheck: Option[DataType] = None
}),
GpuOverrides.expr[Abs](
"Absolute value",
Expand Down
Loading

0 comments on commit fd4c9bc

Please sign in to comment.