Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix divide-by-zero in GpuAverage with ansi mode #2130

Merged
merged 4 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ def test_hash_grpby_avg(data_gen, conf):
conf=conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('ansi_enabled', ['true', 'false'])
def test_hash_grpby_avg_nulls(data_gen, conf, ansi_enabled):
conf.update({'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a')
.agg(f.avg('c')),
conf=conf
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
@pytest.mark.parametrize('ansi_enabled', ['true', 'false'])
def test_hash_reduction_avg_nulls(data_gen, conf, ansi_enabled):
conf.update({'spark.sql.ansi.enabled': ansi_enabled})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.avg('c')),
conf=conf
)

# tracks https://github.com/NVIDIA/spark-rapids/issues/154
@approximate_float
@ignore_order
Expand Down Expand Up @@ -302,7 +326,6 @@ def test_hash_query_max_with_multiple_distincts(data_gen, conf, parameterless):
'count(distinct b) from hash_agg_table group by a',
conf)


@ignore_order
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,13 @@ case class GpuAverage(child: Expression) extends GpuDeclarativeAggregate
// average = (0 + 1) and not 2 which is the rowcount of the projected column.
override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum),
new CudfSum(cudfCount))

// NOTE: this passes an optional boolean to `GpuDivide` to force it not to throw
// divide-by-zero exceptions, even when ansi mode is enabled in Spark.
// This is to conform with Spark's behavior in the Average aggregate function.
override lazy val evaluateExpression: GpuExpression = GpuDivide(
GpuCast(cudfSum, DoubleType),
GpuCast(cudfCount, DoubleType))
GpuCast(cudfCount, DoubleType), Some(false))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: best practice Option(false) but I think we can get away with a simple Boolean


override lazy val initialValues: Seq[GpuLiteral] = Seq(
GpuLiteral(0.0, DoubleType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@ object GpuDivModLike {
}

trait GpuDivModLike extends CudfBinaryArithmetic {
lazy val failOnError: Boolean = ShimLoader.getSparkShims.shouldFailDivByZero()
val failOnErrorOverride: Option[Boolean] = None

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us try without failOnErrorOverride, just make failOnError non-lazy, so we can override it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override a lazy val

override lazy val failOnError: Boolean = failOnErrorOverride.getOrElse(GpuDivModeLike.failOnError)

But I am fine with keeping this as is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I just meant you can't use lazy as a parameter.

lazy val failOnError: Boolean =
failOnErrorOverride.getOrElse(ShimLoader.getSparkShims.shouldFailDivByZero())

override def nullable: Boolean = true

Expand Down Expand Up @@ -330,7 +333,8 @@ object GpuDivideUtil {
}

// This is for doubles and floats...
case class GpuDivide(left: Expression, right: Expression) extends GpuDivModLike {
case class GpuDivide(left: Expression, right: Expression,
override val failOnErrorOverride: Option[Boolean] = None) extends GpuDivModLike {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good but I was curious why we are using a different pattern to Spark which just has a plain boolean argument with a default value, rather than using an option. Was this necessary because of the way we're using the shim layer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I tried other ways but couldn't think of something cleaner.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a 3-value logic of Option[Boolean].

I think we can do it almost like Spark.

let us undo the change to DivModLike just make failOnError non-lazy and define

case class GpuDivide(
  left: Expression, right: Expression,
  override val failOnError: Boolean = ShimLoader.getSparkShims.shouldFailDivByZero()
) extends GpuDivModLike {

override def inputType: AbstractDataType = TypeCollection(DoubleType, DecimalType)

override def symbol: String = "/"
Expand Down