diff --git a/docs/configs.md b/docs/configs.md index 062135abb38..900b3b87106 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -115,6 +115,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Atan|`atan`|Inverse tangent|true|None| spark.rapids.sql.expression.Atanh|`atanh`|Inverse hyperbolic tangent|true|None| spark.rapids.sql.expression.AttributeReference| |References an input column|true|None| +spark.rapids.sql.expression.BRound|`bround`|Round an expression to d decimal places using HALF_EVEN rounding mode|true|None| spark.rapids.sql.expression.BitwiseAnd|`&`|Returns the bitwise AND of the operands|true|None| spark.rapids.sql.expression.BitwiseNot|`~`|Returns the bitwise NOT of the operands|true|None| spark.rapids.sql.expression.BitwiseOr|`\|`|Returns the bitwise OR of the operands|true|None| @@ -197,6 +198,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.RegExpReplace|`regexp_replace`|RegExpReplace support for string literal input patterns|true|None| spark.rapids.sql.expression.Remainder|`%`, `mod`|Remainder or modulo|true|None| spark.rapids.sql.expression.Rint|`rint`|Rounds up a double value to the nearest double equal to an integer|true|None| +spark.rapids.sql.expression.Round|`round`|Round an expression to d decimal places using HALF_UP rounding mode|true|None| spark.rapids.sql.expression.RowNumber|`row_number`|Window function that returns the index for the row within the aggregation window|true|None| spark.rapids.sql.expression.Second|`second`|Returns the second component of the string/timestamp|true|None| spark.rapids.sql.expression.ShiftLeft|`shiftleft`|Bitwise shift left (<<)|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index a9856de427c..43ed59c44c5 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -1903,6 +1903,138 @@ Accelerator support is described below. NS +BRound +`bround` +Round an expression to d decimal places using HALF_EVEN rounding mode +None +project +value + +S +S +S +S +S +S + + + +S* + + + + + + + + + +scale + + + +S + + + + + + + + + + + + + + + + +result + +S +S +S +S +S +S + + + +S* + + + + + + + + + +lambda +value + +NS +NS +NS +NS +NS +NS + + + +NS + + + + + + + + + +scale + + + +NS + + + + + + + + + + + + + + + + +result + +NS +NS +NS +NS +NS +NS + + + +NS + + + + + + + + + BitwiseAnd `&` Returns the bitwise AND of the operands @@ -10388,6 +10520,138 @@ Accelerator support is described below. +Round +`round` +Round an expression to d decimal places using HALF_UP rounding mode +None +project +value + +S +S +S +S +S +S + + + +S* + + + + + + + + + +scale + + + +S + + + + + + + + + + + + + + + + +result + +S +S +S +S +S +S + + + +S* + + + + + + + + + +lambda +value + +NS +NS +NS +NS +NS +NS + + + +NS + + + + + + + + + +scale + + + +NS + + + + + + + + + + + + + + + + +result + +NS +NS +NS +NS +NS +NS + + + +NS + + + + + + + + + RowNumber `row_number` Window function that returns the index for the row within the aggregation window diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 7d7dd0614db..356fd13855f 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -188,6 +188,30 @@ def test_shift_right_unsigned(data_gen): 'shiftrightunsigned(a, cast(null as INT))', 'shiftrightunsigned(a, b)')) +@incompat +@approximate_float +@pytest.mark.parametrize('data_gen', round_gens, ids=idfn) +def test_decimal_bround(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr( + 'bround(a)', + 'bround(a, -1)', + 'bround(a, 1)', + 'bround(a, 10)'), + conf=allow_negative_scale_of_decimal_conf) + +@incompat +@approximate_float +@pytest.mark.parametrize('data_gen', round_gens, ids=idfn) +def test_decimal_round(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr( + 'round(a)', + 'round(a, -1)', + 'round(a, 1)', + 'round(a, 10)'), + conf=allow_negative_scale_of_decimal_conf) + @approximate_float @pytest.mark.parametrize('data_gen', double_gens, ids=idfn) def test_cbrt(data_gen): diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index ec0dd982019..56d0cee1c46 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -763,6 +763,9 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): # Include decimal type while testing equalTo and notEqualTo eq_gens_with_decimal_gen = eq_gens + decimal_gens +#gen for testing round operator +round_gens = numeric_gens + decimal_gens + date_gens = [date_gen] date_n_time_gens = [date_gen, timestamp_gen] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 4a063a90831..4e60a95e974 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1729,6 +1729,26 @@ object GpuOverrides { override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child) }), + expr[BRound]( + "Round an expression to d decimal places using HALF_EVEN rounding mode", + ExprChecks.binaryProjectNotLambda( + TypeSig.numeric, TypeSig.numeric, + ("value", TypeSig.numeric, TypeSig.numeric), + ("scale", TypeSig.lit(TypeEnum.INT), TypeSig.lit(TypeEnum.INT))), + (a, conf, p, r) => new BinaryExprMeta[BRound](a, conf, p, r) { + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuBRound(lhs, rhs) + }), + expr[Round]( + "Round an expression to d decimal places using HALF_UP rounding mode", + ExprChecks.binaryProjectNotLambda( + TypeSig.numeric, TypeSig.numeric, + ("value", TypeSig.numeric, TypeSig.numeric), + ("scale", TypeSig.lit(TypeEnum.INT), TypeSig.lit(TypeEnum.INT))), + (a, conf, p, r) => new BinaryExprMeta[Round](a, conf, p, r) { + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuRound(lhs, rhs) + }), expr[PythonUDF]( "UDF run in an external python process. Does not actually run on the GPU, but " + "the transfer of data to/from it can be accelerated.", diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala index edcb8a3e6d2..0c7b316cd53 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.rapids import java.io.Serializable -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar, UnaryOp} -import com.nvidia.spark.rapids.{Arm, CudfBinaryExpression, CudfUnaryExpression, FloatUtils, GpuColumnVector, GpuUnaryExpression} +import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, RoundMode, Scalar, UnaryOp} +import com.nvidia.spark.rapids.{Arm, CudfBinaryExpression, CudfUnaryExpression, FloatUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuUnaryExpression} +import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression -import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes} +import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, ImplicitCastInputTypes} import org.apache.spark.sql.types._ abstract class CudfUnaryMathExpression(name: String) extends GpuUnaryMathExpression(name) @@ -369,6 +370,69 @@ abstract class CudfBinaryMathExpression(name: String) extends CudfBinaryExpressi override def dataType: DataType = DoubleType } +abstract class GpuRoundBase(child: Expression, scale: Expression) extends GpuBinaryExpression + with Serializable with ImplicitCastInputTypes { + + override def left: Expression = child + override def right: Expression = scale + + def roundMode: RoundMode + + override lazy val dataType: DataType = child.dataType match { + // if the new scale is bigger which means we are scaling up, + // keep the original scale as `Decimal` does + case DecimalType.Fixed(p, s) => DecimalType(p, if (_scale > s) s else _scale) + case t => t + } + + // Avoid repeated evaluation since `scale` is a constant int, + // avoid unnecessary `child` evaluation in both codegen and non-codegen eval + // by checking if scaleV == null as well. + private lazy val scaleV: Any = scale match { + case _: GpuExpression => scale.columnarEval(null) + case _ => scale.eval(EmptyRow) + } + private lazy val _scale: Int = scaleV.asInstanceOf[Int] + + override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, IntegerType) + + override def doColumnar(value: GpuColumnVector, scale: Scalar): ColumnVector = { + val scaleVal = dataType match { + case DecimalType.Fixed(p, s) => s + case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => scale.getInt + case _ => throw new IllegalArgumentException(s"Round operator doesn't support $dataType") + } + val lhsValue = value.getBase + lhsValue.round(scaleVal, roundMode) + } + + override def doColumnar(value: GpuColumnVector, scale: GpuColumnVector): ColumnVector = { + throw new IllegalArgumentException("lhs has to be a vector and rhs has to be a scalar for " + + "the round operator to work") + } + + override def doColumnar(value: Scalar, scale: GpuColumnVector): ColumnVector = { + throw new IllegalArgumentException("lhs has to be a vector and rhs has to be a scalar for " + + "the round operator to work") + } + + override def doColumnar(numRows: Int, value: Scalar, scale: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(value, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, scale) + } + } +} + +case class GpuBRound(child: Expression, scale: Expression) extends + GpuRoundBase(child, scale) { + override def roundMode: RoundMode = RoundMode.HALF_EVEN +} + +case class GpuRound(child: Expression, scale: Expression) extends + GpuRoundBase(child, scale) { + override def roundMode: RoundMode = RoundMode.HALF_UP +} + case class GpuPow(left: Expression, right: Expression) extends CudfBinaryMathExpression("POWER") { override def binaryOp: BinaryOp = BinaryOp.POW