Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support approx_percentile in reduction context #4961

Merged
merged 4 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,14 @@ def do_it(spark):
return df.groupBy('a').agg(f.min(df.b[1]["a"]))
assert_gpu_and_cpu_are_equal_collect(do_it)

@incompat
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_reduction(aqe_enabled):
conf = {'spark.sql.adaptive.enabled': aqe_enabled}
compare_percentile_approx(
lambda spark: gen_df(spark, [('v', DoubleGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95], conf, reduction = True)

@incompat
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte(aqe_enabled):
Expand Down Expand Up @@ -1434,11 +1442,11 @@ def test_hash_groupby_approx_percentile_decimal128_single():
# results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then
# compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage
# of the CPU numbers
def compare_percentile_approx(df_fun, percentiles, conf = {}):
def compare_percentile_approx(df_fun, percentiles, conf = {}, reduction = False):

# create SQL statements for exact and approx percentiles
p_exact_sql = create_percentile_sql("percentile", percentiles)
p_approx_sql = create_percentile_sql("approx_percentile", percentiles)
p_exact_sql = create_percentile_sql("percentile", percentiles, reduction)
p_approx_sql = create_percentile_sql("approx_percentile", percentiles, reduction)

def run_exact(spark):
df = df_fun(spark)
Expand All @@ -1465,8 +1473,9 @@ def run_approx(spark):
gpu_approx_result = approx_gpu[i]

# assert that keys match
assert cpu_exact_result['k'] == cpu_approx_result['k']
assert cpu_exact_result['k'] == gpu_approx_result['k']
if not reduction:
assert cpu_exact_result['k'] == cpu_approx_result['k']
assert cpu_exact_result['k'] == gpu_approx_result['k']

# extract the percentile result column
exact_percentile = cpu_exact_result['the_percentile']
Expand Down Expand Up @@ -1501,13 +1510,22 @@ def run_approx(spark):
else:
assert abs(cpu_delta / gpu_delta) - 1 < 0.001

def create_percentile_sql(func_name, percentiles):
if isinstance(percentiles, list):
return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format(
func_name, ",".join(str(i) for i in percentiles))
def create_percentile_sql(func_name, percentiles, reduction):
if reduction:
if isinstance(percentiles, list):
return """select {}(v, array({})) as the_percentile from t""".format(
func_name, ",".join(str(i) for i in percentiles))
else:
return """select {}(v, {}) as the_percentile from t""".format(
func_name, percentiles)
else:
return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format(
func_name, percentiles)
if isinstance(percentiles, list):
return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format(
func_name, ",".join(str(i) for i in percentiles))
else:
return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format(
func_name, percentiles)


@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf
import ai.rapids.cudf.GroupByAggregation
import ai.rapids.cudf.{GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.shims.ShimExpression

Expand Down Expand Up @@ -178,8 +178,11 @@ case class ApproxPercentileFromTDigestExpr(

class CudfTDigestUpdate(accuracyExpression: GpuLiteral)
extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ =>
throw new UnsupportedOperationException("TDigest is not yet supported in reduction")

override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(col: cudf.ColumnVector) =>
col.reduce(ReductionAggregation.createTDigest(CudfTDigest.accuracy(accuracyExpression)))

override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.createTDigest(CudfTDigest.accuracy(accuracyExpression))
override val name: String = "CudfTDigestUpdate"
Expand All @@ -189,8 +192,10 @@ class CudfTDigestUpdate(accuracyExpression: GpuLiteral)
class CudfTDigestMerge(accuracyExpression: GpuLiteral)
extends CudfAggregate {

override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = _ =>
throw new UnsupportedOperationException("TDigest is not yet supported in reduction")
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(col: cudf.ColumnVector) =>
col.reduce(ReductionAggregation.mergeTDigest(CudfTDigest.accuracy(accuracyExpression)))

override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.mergeTDigest(CudfTDigest.accuracy(accuracyExpression))
override val name: String = "CudfTDigestMerge"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3272,7 +3272,7 @@ object GpuOverrides extends Logging {
}),
expr[ApproximatePercentile](
"Approximate percentile",
ExprChecks.groupByOnly(
ExprChecks.reductionAndGroupByAgg(
// note that output can be single number or array depending on whether percentiles param
// is a single number or an array
TypeSig.gpuNumeric +
Expand Down