From 92a1435a0a73013db478ec0865cf6dc6d43cdf1c Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 6 Jan 2022 15:23:28 -0800 Subject: [PATCH 1/4] Add in HashPartitioning support for decimal 128 Signed-off-by: Niranjan Artal --- integration_tests/src/main/python/data_gen.py | 4 ++-- .../src/main/python/hash_aggregate_test.py | 13 +++++++++---- integration_tests/src/main/python/join_test.py | 16 ++++++++-------- .../src/main/python/repart_test.py | 11 ++++++++++- .../com/nvidia/spark/rapids/GpuOverrides.scala | 18 +++++++++--------- .../com/nvidia/spark/rapids/aggregate.scala | 12 ++---------- .../sql/rapids/execution/GpuHashJoin.scala | 2 +- 7 files changed, 41 insertions(+), 35 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 5c3a8e33649..6d637b2f834 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -969,7 +969,7 @@ def copy_and_update(conf, *more_confs): all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen(), decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, - decimal_gen_64bit] + decimal_gen_64bit, decimal_gen_128bit, decimal_gen_36_5, decimal_gen_38_10] # Pyarrow will complain the error as below if the timestamp is out of range for both CPU and GPU, # so narrow down the time range to avoid exceptions causing test failures. diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 9aad3638dc3..9b7d075ebb5 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -257,6 +257,11 @@ def get_params(init_list, marked_params=[]): ('b', DecimalGen(precision=5, scale=2)), ('c', DecimalGen(precision=8, scale=3))] +_grpkey_big_decimals = [ + ('a', RepeatSeqGen(DecimalGen(precision=32, scale=10, nullable=(True, 10.0)), length=50)), + ('b', DecimalGen(precision=20, scale=2)), + ('c', DecimalGen(precision=36, scale=5))] + _grpkey_short_mid_decimals = [ ('a', RepeatSeqGen(short_gen, length=50)), ('b', decimal_gen_18_3), @@ -296,7 +301,7 @@ def get_params(init_list, marked_params=[]): _grpkey_small_decimals] _init_list_no_nans_with_decimalbig = _init_list_no_nans + [ - _grpkey_small_decimals, _grpkey_short_mid_decimals, + _grpkey_small_decimals, _grpkey_big_decimals, _grpkey_short_mid_decimals, _grpkey_short_big_decimals, _grpkey_short_very_big_decimals, _grpkey_short_very_big_neg_scale_decimals] @@ -448,7 +453,7 @@ def test_exceptAll(data_gen): @approximate_float @ignore_order(local=True) @incompat -@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimal, ids=idfn) +@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimalbig, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_grpby_pivot(data_gen, conf): assert_gpu_and_cpu_are_equal_collect( @@ -456,7 +461,7 @@ def test_hash_grpby_pivot(data_gen, conf): .groupby('a') .pivot('b') .agg(f.sum('c')), - conf=conf) + conf = copy_and_update(allow_negative_scale_of_decimal_conf, conf)) @approximate_float @ignore_order(local=True) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index cd955c7e079..5b090228caa 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat]), decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, - decimal_gen_neg_scale, decimal_gen_64bit] + decimal_gen_neg_scale, decimal_gen_64bit] + decimal_128_gens all_gen_no_nulls = [StringGen(nullable=False), ByteGen(nullable=False), ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False), @@ -184,7 +184,7 @@ def do_join(spark): @allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero', 'ShuffleExchangeExec', 'HashPartitioning') @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', single_level_array_gens + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn) @pytest.mark.parametrize('join_type', all_join_types, ids=idfn) def test_sortmerge_join_wrong_key_fallback(data_gen, join_type): def do_join(spark): @@ -252,7 +252,7 @@ def do_join(spark): # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed -@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_cartesian_join(data_gen, batch_size): def do_join(spark): @@ -267,7 +267,7 @@ def do_join(spark): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.xfail(condition=is_databricks_runtime(), reason='https://github.com/NVIDIA/spark-rapids/issues/334') -@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_cartesian_join_special_case_count(data_gen, batch_size): def do_join(spark): @@ -295,7 +295,7 @@ def do_join(spark): # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed -@pytest.mark.parametrize('data_gen', all_gen + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_cartesian_join_with_condition(data_gen, batch_size): def do_join(spark): @@ -311,7 +311,7 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_broadcast_nested_loop_join(data_gen, batch_size): def do_join(spark): @@ -323,7 +323,7 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_broadcast_nested_loop_join_special_case_count(data_gen, batch_size): def do_join(spark): diff --git a/integration_tests/src/main/python/repart_test.py b/integration_tests/src/main/python/repart_test.py index 8b8df12c2cb..9dce60fa882 100644 --- a/integration_tests/src/main/python/repart_test.py +++ b/integration_tests/src/main/python/repart_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -240,6 +240,14 @@ def test_round_robin_sort_fallback(data_gen): ([('a', decimal_gen_scale_precision)], ['a']), ([('a', decimal_gen_same_scale_precision)], ['a']), ([('a', decimal_gen_64bit)], ['a']), + ([('a', decimal_gen_64bit)], ['a']), + ([('a', decimal_gen_128bit)], ['a']), + ([('a', decimal_gen_30_2)], ['a']), + ([('a', decimal_gen_36_5)], ['a']), + ([('a', decimal_gen_36_neg5)], ['a']), + ([('a', decimal_gen_38_0)], ['a']), + ([('a', decimal_gen_38_10)], ['a']), + ([('a', decimal_gen_38_neg10)], ['a']), ([('a', string_gen)], ['a']), ([('a', null_gen)], ['a']), ([('a', StructGen([('c0', boolean_gen), ('c1', StructGen([('c1_0', byte_gen), ('c1_1', string_gen), ('c1_2', boolean_gen)]))]))], ['a']), @@ -256,6 +264,7 @@ def test_round_robin_sort_fallback(data_gen): ([('a', timestamp_gen), ('b', date_gen), ('c', int_gen)], ['a', 'b', 'c']), ([('a', short_gen), ('b', string_gen), ('c', int_gen)], ['a', 'b', 'c']), ([('a', decimal_gen_default), ('b', decimal_gen_64bit), ('c', decimal_gen_scale_precision)], ['a', 'b', 'c']), + ([('a', decimal_gen_128bit), ('b', decimal_gen_38_neg10), ('c', decimal_gen_36_5)], ['a', 'b', 'c']), ], ids=idfn) def test_hash_repartition_exact(gen, num_parts): data_gen = gen[0] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7ba1a268981..7faa0dfa4c8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2187,17 +2187,17 @@ object GpuOverrides extends Logging { expr[PivotFirst]( "PivotFirst operator", ExprChecks.reductionAndGroupByAgg( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_64), + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL), TypeSig.all, Seq(ParamCheck( "pivotColumn", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64) + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL) .withPsNote(TypeEnum.DOUBLE, nanAggPsNote) .withPsNote(TypeEnum.FLOAT, nanAggPsNote), TypeSig.all), ParamCheck("valueColumn", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL, TypeSig.all))), (pivot, conf, p, r) => new ImperativeAggExprMeta[PivotFirst](pivot, conf, p, r) { override def tagAggForGpu(): Unit = { @@ -2996,8 +2996,8 @@ object GpuOverrides extends Logging { "Murmur3 hash operator", ExprChecks.projectOnly(TypeSig.INT, TypeSig.INT, repeatingParamCheck = Some(RepeatingParamCheck("input", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + TypeSig.STRUCT).nested(), - TypeSig.all))), + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + TypeSig.STRUCT).nested(), TypeSig.all))), (a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = a.children .map(GpuOverrides.wrapExpr(_, conf, Some(this))) @@ -3361,8 +3361,8 @@ object GpuOverrides extends Logging { "Hash based partitioning", // This needs to match what murmur3 supports. PartChecks(RepeatingParamCheck("hash_key", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + TypeSig.STRUCT).nested(), - TypeSig.all)), + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + TypeSig.STRUCT).nested(), TypeSig.all)), (hp, conf, p, r) => new PartMeta[HashPartitioning](hp, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = hp.expressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) @@ -3693,7 +3693,7 @@ object GpuOverrides extends Logging { exec[ExpandExec]( "The backend for the expand operator", ExecChecks( - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 87041ddbb3b..30bc34bd144 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -22,7 +22,7 @@ import scala.annotation.tailrec import scala.collection.mutable import ai.rapids.cudf -import ai.rapids.cudf.{DType, NvtxColor} +import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter} import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil} -import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, MapType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType} import org.apache.spark.sql.vectorized.ColumnarBatch object AggregateUtils { @@ -828,14 +828,6 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( willNotWorkOnGpu("ArrayTypes or MapTypes in grouping expressions are not supported") } - val dec128Grouping = agg.groupingExpressions.exists(e => - TrampolineUtil.dataTypeExistsRecursively(e.dataType, - dt => dt.isInstanceOf[DecimalType] && - dt.asInstanceOf[DecimalType].precision > DType.DECIMAL64_MAX_PRECISION)) - if (dec128Grouping) { - willNotWorkOnGpu("grouping by a 128-bit decimal value is not currently supported") - } - tagForReplaceMode() if (agg.aggregateExpressions.exists(expr => expr.isDistinct) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index e67c9ae81db..5a574016995 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -58,7 +58,7 @@ object JoinTypeChecks { val CONDITION = "condition" private[this] val cudfSupportedKeyTypes = - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + TypeSig.STRUCT).nested() + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT).nested() private[this] val sparkSupportedJoinKeyTypes = TypeSig.all - TypeSig.MAP.nested() private[this] val joinRideAlongTypes = From c68ccb9cb60deed0e47e4a56daf34b49e4215c18 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 6 Jan 2022 15:36:24 -0800 Subject: [PATCH 2/4] update docs --- docs/supported_ops.md | 56 +++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 376eb5c0ace..efac01fbba4 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -194,13 +194,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
NS @@ -722,13 +722,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
NS @@ -743,13 +743,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
NS @@ -902,13 +902,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
NS @@ -923,13 +923,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
NS @@ -989,13 +989,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
NS @@ -1010,13 +1010,13 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, UDT
NS @@ -8696,13 +8696,13 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
NS @@ -15427,7 +15427,7 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS @@ -15448,7 +15448,7 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS @@ -15469,11 +15469,11 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
NS NS NS @@ -15491,7 +15491,7 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS @@ -15512,7 +15512,7 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS @@ -15533,11 +15533,11 @@ are limited. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
NS NS NS @@ -17299,13 +17299,13 @@ as `a` don't show up in the table. They are controlled by the rules for S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S S NS NS NS NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT
NS From cbb64c73aadead5cd8bb495195078042b8283e66 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 10 Jan 2022 15:47:19 -0800 Subject: [PATCH 3/4] updates tests for ReplicateRows Signed-off-by: Niranjan Artal --- .../src/main/python/hash_aggregate_test.py | 15 ++------------- .../com/nvidia/spark/rapids/GpuOverrides.scala | 3 --- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 9b7d075ebb5..44d07eabe6c 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -424,27 +424,16 @@ def test_hash_avg_nulls_partial_only(data_gen): @approximate_float @ignore_order @incompat -@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimal, ids=idfn) +@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimalbig, ids=idfn) def test_intersectAll(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, data_gen, length=100).intersectAll(gen_df(spark, data_gen, length=100)), conf=allow_negative_scale_of_decimal_conf) -# Grouping by a 128-bit decimal value is not currently supported, so HashAggregateExec runs on -# CPU followed by expression replicateRows which supports decimal128. -@allow_non_gpu('HashAggregateExec', 'ShuffleExchangeExec') @approximate_float @ignore_order @incompat -@pytest.mark.parametrize('data_gen', [_grpkey_short_big_decimals], ids=idfn) -def test_intersectAll_decimal128(data_gen): - assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, data_gen, length=100).intersectAll(gen_df(spark, data_gen, length=100))) - -@approximate_float -@ignore_order -@incompat -@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimal, ids=idfn) +@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimalbig, ids=idfn) def test_exceptAll(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, data_gen, length=100).exceptAll(gen_df(spark, data_gen, length=100).filter('a != b')), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7faa0dfa4c8..6dd3ba19688 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3107,9 +3107,6 @@ object GpuOverrides extends Logging { expr[ReplicateRows]( "Given an input row replicates the row N times", ExprChecks.projectOnly( - // The plan is optimized to run HashAggregate on the rows to be replicated. - // HashAggregateExec doesn't support grouping by 128-bit decimal value yet. - // Issue to track decimal 128 support: https://github.com/NVIDIA/spark-rapids/issues/4410 TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.ARRAY + TypeSig.STRUCT), TypeSig.ARRAY.nested(TypeSig.all), From 2d1090c3e70d8e03ff62dc19cf1bc2ba242fcde9 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 19 Jan 2022 16:07:29 -0800 Subject: [PATCH 4/4] fix compile errors Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/GpuOverrides.scala | 14 +++++++------- .../spark/sql/rapids/execution/GpuHashJoin.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 66c3047b720..58f46a6b206 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2186,17 +2186,17 @@ object GpuOverrides extends Logging { expr[PivotFirst]( "PivotFirst operator", ExprChecks.reductionAndGroupByAgg( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + - TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL), + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + + TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128), TypeSig.all, Seq(ParamCheck( "pivotColumn", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL) + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128) .withPsNote(TypeEnum.DOUBLE, nanAggPsNote) .withPsNote(TypeEnum.FLOAT, nanAggPsNote), TypeSig.all), ParamCheck("valueColumn", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128, TypeSig.all))), (pivot, conf, p, r) => new ImperativeAggExprMeta[PivotFirst](pivot, conf, p, r) { override def tagAggForGpu(): Unit = { @@ -3021,7 +3021,7 @@ object GpuOverrides extends Logging { "Murmur3 hash operator", ExprChecks.projectOnly(TypeSig.INT, TypeSig.INT, repeatingParamCheck = Some(RepeatingParamCheck("input", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.STRUCT).nested(), TypeSig.all))), (a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = a.children @@ -3396,7 +3396,7 @@ object GpuOverrides extends Logging { "Hash based partitioning", // This needs to match what murmur3 supports. PartChecks(RepeatingParamCheck("hash_key", - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.STRUCT).nested(), TypeSig.all)), (hp, conf, p, r) => new PartMeta[HashPartitioning](hp, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = @@ -3728,7 +3728,7 @@ object GpuOverrides extends Logging { exec[ExpandExec]( "The backend for the expand operator", ExecChecks( - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all), (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index 79f1afb61ca..d54f3e86bc5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -59,7 +59,7 @@ object JoinTypeChecks { val CONDITION = "condition" private[this] val cudfSupportedKeyTypes = - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT).nested() + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.STRUCT).nested() private[this] val sparkSupportedJoinKeyTypes = TypeSig.all - TypeSig.MAP.nested() private[this] val joinRideAlongTypes =