From b7fb94af4f22bb92868aebaf898429153253d6fd Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 25 Mar 2021 11:50:38 -0500 Subject: [PATCH] Add in murmur3 support for float, double, date and timestamp (#2017) Signed-off-by: Robert (Bobby) Evans --- docs/supported_ops.md | 8 +++---- .../src/main/python/repart_test.py | 8 +++++-- .../nvidia/spark/rapids/GpuOverrides.scala | 7 ++---- .../spark/sql/rapids/HashFunctions.scala | 23 +------------------ 4 files changed, 13 insertions(+), 33 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index f7396ba0f91..d39cf62d1e0 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -9719,10 +9719,10 @@ Accelerator support is described below. S S S -NS -NS -NS -NS +S +S +S +S* S S* S diff --git a/integration_tests/src/main/python/repart_test.py b/integration_tests/src/main/python/repart_test.py index de038fdd206..7225ab451a1 100644 --- a/integration_tests/src/main/python/repart_test.py +++ b/integration_tests/src/main/python/repart_test.py @@ -81,8 +81,10 @@ def test_repartion_df(num_parts, length): ([('a', short_gen)], ['a']), ([('a', int_gen)], ['a']), ([('a', long_gen)], ['a']), - pytest.param(([('a', float_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')), - pytest.param(([('a', double_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')), + ([('a', float_gen)], ['a']), + ([('a', double_gen)], ['a']), + ([('a', timestamp_gen)], ['a']), + ([('a', date_gen)], ['a']), ([('a', decimal_gen_default)], ['a']), ([('a', decimal_gen_neg_scale)], ['a']), ([('a', decimal_gen_scale_precision)], ['a']), @@ -97,6 +99,8 @@ def test_repartion_df(num_parts, length): ([('a', int_gen), ('b', byte_gen)], ['a', 'b']), ([('a', long_gen), ('b', null_gen)], ['a', 'b']), ([('a', byte_gen), ('b', boolean_gen), ('c', short_gen)], ['a', 'b', 'c']), + ([('a', float_gen), ('b', double_gen), ('c', short_gen)], ['a', 'b', 'c']), + ([('a', timestamp_gen), ('b', date_gen), ('c', int_gen)], ['a', 'b', 'c']), ([('a', short_gen), ('b', string_gen), ('c', int_gen)], ['a', 'b', 'c']), ([('a', decimal_gen_default), ('b', decimal_gen_64bit), ('c', decimal_gen_scale_precision)], ['a', 'b', 'c']), ], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index f6e2e793886..4902fecfe73 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2312,9 +2312,7 @@ object GpuOverrides { "Murmur3 hash operator", ExprChecks.projectNotLambda(TypeSig.INT, TypeSig.INT, repeatingParamCheck = Some(RepeatingParamCheck("input", - // Floating point values don't work because of -0.0 is not hashed properly - TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG + - TypeSig.STRING + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all))), (a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = a.children @@ -2486,8 +2484,7 @@ object GpuOverrides { // TODO In 0.5 we should make the checks self documenting, and look more like what // SparkPlan and Expression support // https://github.com/NVIDIA/spark-rapids/issues/1915 - val sig = TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG + - TypeSig.STRING + TypeSig.NULL + TypeSig.DECIMAL + val sig = TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL hp.children.foreach { child => sig.tagExprParam(this, child, "hash_key") } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala index 16b7026b441..d383da9c288 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala @@ -38,28 +38,7 @@ case class GpuMd5(child: Expression) object GpuMurmur3Hash extends Arm { def compute(batch: ColumnarBatch, boundExpr: Seq[Expression], seed: Int = 42): ColumnVector = { - val newExprs = boundExpr.map { expr => - expr.dataType match { - case ByteType | ShortType => - GpuCast(expr, IntegerType) - case DoubleType => - // We have to normalize the NaNs, but not zeros - // however the current cudf code does the wrong thing for -0.0 - // https://github.com/NVIDIA/spark-rapids/issues/1914 - GpuIf(GpuIsNan(expr), GpuLiteral(Double.NaN, DoubleType), expr) - case FloatType => - // We have to normalize the NaNs, but not zeros - // however the current cudf code does the wrong thing for -0.0 - // https://github.com/NVIDIA/spark-rapids/issues/1914 - GpuIf(GpuIsNan(expr), GpuLiteral(Float.NaN, FloatType), expr) - case dt: DecimalType if dt.precision <= DType.DECIMAL64_MAX_PRECISION => - // For these values it is just hashing it as a long - GpuUnscaledValue(expr) - case _ => - expr - } - } - withResource(GpuProjectExec.project(batch, newExprs)) { args => + withResource(GpuProjectExec.project(batch, boundExpr)) { args => val bases = GpuColumnVector.extractBases(args) ColumnVector.spark32BitMurmurHash3(seed, bases.toArray[ColumnView]) }