diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala index 4be623cc929..57379585802 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/FloatUtils.scala @@ -40,8 +40,10 @@ object FloatUtils extends Arm { def getNanScalar(dType: DType): Scalar = { if (dType == DType.FLOAT64) { Scalar.fromDouble(Double.NaN) - } else { + } else if (dType == DType.FLOAT32) { Scalar.fromFloat(Float.NaN) + } else { + throw new IllegalArgumentException("NaNs are only supported for Float types") } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index b5c7f6b89a8..80eefd4edf0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -79,6 +79,8 @@ class CastExprMeta[INPUT <: CastBase]( "converting floating point data types to strings and this can produce results that " + "differ from the default behavior in Spark. To enable this operation on the GPU, set" + s" ${RapidsConf.ENABLE_CAST_FLOAT_TO_STRING} to true.") + case (_: StringType, dt: DecimalType) if dt.precision + 1 > Decimal.MAX_LONG_DIGITS => + willNotWorkOnGpu(s"Because of rounding requirements we cannot support $dt on the GPU") case (_: StringType, _: FloatType | _: DoubleType) if !conf.isCastStringToFloatEnabled => willNotWorkOnGpu("Currently hex values aren't supported on the GPU. Also note " + "that casting from string to float types on the GPU returns incorrect results when " + @@ -97,17 +99,6 @@ class CastExprMeta[INPUT <: CastBase]( YearParseUtil.tagParseStringAsDate(conf, this) case (_: StringType, _: DateType) => YearParseUtil.tagParseStringAsDate(conf, this) - case (_: StringType, _: DecimalType) if !conf.isCastStringToDecimalEnabled => - // FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019 - willNotWorkOnGpu("Currently string to decimal type on the GPU might produce " + - "results which slightly differed from the correct results when the string represents " + - "any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For " + - "instance, the GPU returns 99999999999999987 when given the input string " + - "\"99999999999999999\". The cause of divergence is that we can not cast strings " + - "containing scientific notation to decimal directly. So, we have to cast strings " + - "to floats firstly. Then, cast floats to decimals. The first step may lead to " + - "precision loss. To enable this operation on the GPU, set " + - s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.") case (structType: StructType, StringType) => structType.foreach { field => recursiveTagExprForGpuCheck(field.dataType, StringType, depth + 1) @@ -160,9 +151,11 @@ object GpuCast extends Arm { val INVALID_INPUT_MESSAGE: String = "Column contains at least one value that is not in the " + "required range" - val INVALID_FLOAT_CAST_MSG: String = "At least one value is either null or is an invalid number" + val INVALID_NUMBER_MSG: String = "At least one value is either null or is an invalid number" - def sanitizeStringToFloat(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { + def sanitizeStringToFloat( + input: ColumnVector, + ansiEnabled: Boolean): ColumnVector = { // This regex gets applied after the transformation to normalize use of Inf and is // just strict enough to filter out known edge cases that would result in incorrect @@ -189,24 +182,24 @@ object GpuCast extends Arm { withResource(GpuScalar.from(null, DataTypes.StringType)) { nullString => // filter out strings containing breaking whitespace val withoutWhitespace = withResource(ColumnVector.fromStrings("\r", "\n")) { - verticalWhitespace => - withResource(stripped.contains(verticalWhitespace)) { - _.ifElse(nullString, stripped) - } + verticalWhitespace => + withResource(stripped.contains(verticalWhitespace)) { + _.ifElse(nullString, stripped) + } } - // replace all possible versions of "Inf" and "Infinity" with "Inf" - val inf = withResource(withoutWhitespace) { _ => + // replace all possible versions of "Inf" and "Infinity" with "Inf" + val inf = withResource(withoutWhitespace) { _ => withoutWhitespace.stringReplaceWithBackrefs( - "(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf") - } - // replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf" - val infWithoutPlus = withResource(inf) { _ => - withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search => - withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace => - inf.stringReplace(search, replace) + "(?:[iI][nN][fF])" + "(?:[iI][nN][iI][tT][yY])?", "Inf") + } + // replace "+Inf" with "Inf" because cuDF only supports "Inf" and "-Inf" + val infWithoutPlus = withResource(inf) { _ => + withResource(GpuScalar.from("+Inf", DataTypes.StringType)) { search => + withResource(GpuScalar.from("Inf", DataTypes.StringType)) { replace => + inf.stringReplace(search, replace) + } } } - } // filter out any strings that are not valid floating point numbers according // to the regex pattern val floatOrNull = withResource(infWithoutPlus) { _ => @@ -215,7 +208,7 @@ object GpuCast extends Arm { withResource(isFloat.all()) { allMatch => // Check that all non-null values are valid floats. if (allMatch.isValid && !allMatch.getBoolean) { - throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) + throw new NumberFormatException(GpuCast.INVALID_NUMBER_MSG) } infWithoutPlus.incRefCount() } @@ -502,13 +495,8 @@ object GpuCast extends Arm { } } case (StringType, dt: DecimalType) => - // To apply HALF_UP rounding strategy during casting to decimal, we firstly cast - // string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding. - withResource(input.strip()) { trimmed => - withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp => - castFloatsToDecimal(fp, dt, ansiMode) - } - } + castStringToDecimal(input, ansiMode, dt) + case (ByteType | ShortType | IntegerType | LongType, dt: DecimalType) => castIntegralsToDecimal(input, dt, ansiMode) @@ -777,17 +765,62 @@ object GpuCast extends Arm { } } + def castStringToDecimal( + input: ColumnView, + ansiEnabled: Boolean, + dt: DecimalType): ColumnVector = { + // 1. Sanitize strings to make sure all are fixed points + // 2. Identify all fixed point values + // 3. Cast String to newDt (newDt = dt. precision + 1, dt.scale + 1). Promote precision if + // needed. This step is required so we can round up if needed in the final step + // 4. Now cast newDt to dt (Decimal to Decimal) + def getInterimDecimalPromoteIfNeeded(dt: DecimalType): DecimalType = { + if (dt.precision + 1 > Decimal.MAX_LONG_DIGITS) { + //We don't support Decimal 128 + throw new IllegalArgumentException("One or more values exceed the maximum supported " + + "Decimal precision while conversion") + } + DecimalType(dt.precision + 1, dt.scale + 1) + } + + val interimSparkDt = getInterimDecimalPromoteIfNeeded(dt) + val interimDt = DecimalUtil.createCudfDecimal(interimSparkDt) + val isFixedPoints = withResource(input.strip()) { + // We further filter out invalid values using the cuDF isFixedPoint method. + _.isFixedPoint(interimDt) + } + + withResource(isFixedPoints) { isFixedPoints => + if (ansiEnabled) { + withResource(isFixedPoints.all()) { allFixedPoints => + if (allFixedPoints.isValid && !allFixedPoints.getBoolean) { + throw new ArithmeticException(s"One or more values cannot be " + + s"represented as Decimal(${dt.precision}, ${dt.scale})") + } + } + } + // intermediate step needed so we can make sure we can round up + withResource(input.castTo(interimDt)) { interimDecimals => + withResource(Scalar.fromNull(interimDt)) { nulls => + withResource(isFixedPoints.ifElse(interimDecimals, nulls)) { decimals => + // cast Decimal to the Decimal that's needed + castDecimalToDecimal(decimals, interimSparkDt, dt, ansiEnabled) + } + } + } + } + } + def castStringToFloats( input: ColumnVector, ansiEnabled: Boolean, dType: DType): ColumnVector = { - // 1. convert the different infinities to "Inf"/"-Inf" which is the only variation cudf // understands // 2. identify the nans // 3. identify the floats. "nan", "null" and letters are not considered floats - // 4. if ansi is enabled we want to throw and exception if the string is neither float nor nan - // 5. convert everything thats not floats to null + // 4. if ansi is enabled we want to throw an exception if the string is neither float nor nan + // 5. convert everything that's not floats to null // 6. set the indices where we originally had nans to Float.NaN // // NOTE Limitation: "1.7976931348623159E308" and "-1.7976931348623159E308" are not considered @@ -805,7 +838,7 @@ object GpuCast extends Arm { withResource(nanOrFloat.all()) { allNanOrFloat => // Check that all non-null values are valid floats or NaN. if (allNanOrFloat.isValid && !allNanOrFloat.getBoolean) { - throw new NumberFormatException(GpuCast.INVALID_FLOAT_CAST_MSG) + throw new NumberFormatException(GpuCast.INVALID_NUMBER_MSG) } } } @@ -1146,7 +1179,7 @@ object GpuCast extends Arm { // We rely on containerDecimal to perform preciser rounding. So, we have to take extra // space cost of container into consideration when we run bound check. val containerScaleBound = DType.DECIMAL64_MAX_PRECISION - (dt.scale + 1) - val bound = math.pow(10, (dt.precision - dt.scale) min containerScaleBound) + val bound = math.pow(10, (dt.precision - dt.scale).min(containerScaleBound)) if (ansiMode) { assertValuesInRange(rounded, minValue = Scalar.fromDouble(-bound), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala index 871e86a0152..aae1c2ed028 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.rapids import java.lang.invoke.SerializedLambda import com.nvidia.spark.RapidsUDF -import com.nvidia.spark.rapids.{DataFromReplacementRule, ExprMeta, GpuExpression, GpuRowBasedUserDefinedFunction, GpuUserDefinedFunction, RapidsConf, RapidsMeta, VersionUtils} +import com.nvidia.spark.rapids.{DataFromReplacementRule, ExprMeta, GpuExpression, GpuRowBasedUserDefinedFunction, GpuUserDefinedFunction, RapidsConf, RapidsMeta} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF, SpecializedGetters} import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType} +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType} case class GpuScalaUDF( function: RapidsUDF, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala index 4ff7eba53a6..851dc03a7f3 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AnsiCastOpSuite.scala @@ -408,22 +408,22 @@ class AnsiCastOpSuite extends GpuExpressionTestSuite { } testCastFailsForBadInputs("Test bad cast 1 from strings to floats", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c0").cast(FloatType)) } testCastFailsForBadInputs("Test bad cast 2 from strings to floats", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c1").cast(FloatType)) } testCastFailsForBadInputs("Test bad cast 1 from strings to double", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c0").cast(DoubleType)) } testCastFailsForBadInputs("Test bad cast 2 from strings to double", invalidFloatStringsDf, - msg = GpuCast.INVALID_FLOAT_CAST_MSG) { + msg = GpuCast.INVALID_NUMBER_MSG) { frame =>frame.select(col("c1").cast(DoubleType)) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 519b2d14673..2a63861334b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -875,12 +875,20 @@ class CastOpSuite extends GpuExpressionTestSuite { } test("cast string to decimal") { - List(-18, -10, -3, 0, 1, 5, 15).foreach { scale => - testCastToDecimal(DataTypes.StringType, scale, + List(-17, -10, -3, 0, 1, 5, 15).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale, precision = 17, customRandGenerator = Some(new scala.util.Random(1234L))) } } + test("cast string to decimal (fail)") { + assertThrows[IllegalArgumentException]( + List(-18, 18, 2, 32, 8).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale, + customRandGenerator = Some(new scala.util.Random(1234L))) + }) + } + test("cast string to decimal (include NaN/INF/-INF)") { def doubleStrings(ss: SparkSession): DataFrame = { val df1 = floatsAsStrings(ss).selectExpr("cast(c0 as Double) as col") @@ -888,7 +896,7 @@ class CastOpSuite extends GpuExpressionTestSuite { df1.unionAll(df2) } List(-10, -1, 0, 1, 10).foreach { scale => - testCastToDecimal(DataTypes.StringType, scale = scale, + testCastToDecimal(DataTypes.StringType, scale = scale, precision = 17, customDataGenerator = Some(doubleStrings)) } } @@ -898,15 +906,15 @@ class CastOpSuite extends GpuExpressionTestSuite { import ss.sqlContext.implicits._ column.toDF("col") } - testCastToDecimal(DataTypes.StringType, scale = 7, + testCastToDecimal(DataTypes.StringType, scale = 7, precision = 17, customDataGenerator = Some(specialGenerator(Seq("9999999999")))) - testCastToDecimal(DataTypes.StringType, scale = 2, + testCastToDecimal(DataTypes.StringType, scale = 2, precision = 17, customDataGenerator = Some(specialGenerator(Seq("999999999999999")))) - testCastToDecimal(DataTypes.StringType, scale = 0, + testCastToDecimal(DataTypes.StringType, scale = 0, precision = 17, customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) - testCastToDecimal(DataTypes.StringType, scale = -1, + testCastToDecimal(DataTypes.StringType, scale = -1, precision = 17, customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) - testCastToDecimal(DataTypes.StringType, scale = -10, + testCastToDecimal(DataTypes.StringType, scale = -10, precision = 17, customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) } @@ -915,7 +923,7 @@ class CastOpSuite extends GpuExpressionTestSuite { exponentsAsStringsDf(ss).select(col("c0").as("col")) } List(-10, -1, 0, 1, 10).foreach { scale => - testCastToDecimal(DataTypes.StringType, scale = scale, + testCastToDecimal(DataTypes.StringType, scale = scale, precision = 17, customDataGenerator = Some(exponentsAsStrings), ansiEnabled = true) }