diff --git a/docs/configs.md b/docs/configs.md index 916d4d719e1..8abcf8f00a2 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -54,6 +54,7 @@ Name | Description | Default Value spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|false spark.rapids.sql.castFloatToIntegralTypes.enabled|Casting from floating point types to integral types on the GPU supports a slightly different range of values when using Spark 3.1.0 or later. Refer to the CAST documentation for more details.|false spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|false +spark.rapids.sql.castStringToDecimal.enabled|When set to true, enables casting from strings to decimal type on the GPU. Currently string to decimal type on the GPU might produce results which slightly differed from the correct results when the string represents any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given input string "99999999999999999". The cause of divergence is that we can not cast strings containing scientific notation to decimal directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. The first step may lead to precision loss.|false spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false spark.rapids.sql.castStringToInteger.enabled|When set to true, enables casting from strings to integer types (byte, short, int, long) on the GPU. Casting from string to integer types on the GPU returns incorrect results when the string represents a number larger than Long.MaxValue or smaller than Long.MinValue.|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 65f70def15d..0cdf6090132 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -18029,7 +18029,7 @@ and the accelerator produces the same result. S S* S -NS +S* S NS @@ -18433,7 +18433,7 @@ and the accelerator produces the same result. S S* S -NS +S* S NS diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 380f4a9a40c..f996df38307 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -78,6 +78,18 @@ class CastExprMeta[INPUT <: CastBase]( "for more details. To enable this operation on the GPU, set" + s" ${RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP} to true.") } + // FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019 + if (!conf.isCastStringToDecimalEnabled && cast.child.dataType == DataTypes.StringType && + cast.dataType.isInstanceOf[DecimalType]) { + willNotWorkOnGpu("Currently string to decimal type on the GPU might produce results which " + + "slightly differed from the correct results when the string represents any number " + + "exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU " + + "returns 99999999999999987 given input string \"99999999999999999\". The cause of " + + "divergence is that we can not cast strings containing scientific notation to decimal " + + "directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. " + + "The first step may lead to precision loss. To enable this operation on the GPU, set " + + s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.") + } } def buildTagMessage(entry: ConfEntry[_]): String = { @@ -387,6 +399,14 @@ case class GpuCast( } } } + case (StringType, dt: DecimalType) => + // To apply HALF_UP rounding strategy during casting to decimal, we firstly cast + // string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding. + withResource(input.getBase.strip()) { trimmed => + withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp => + castFloatsToDecimal(fp, dt) + } + } case (ShortType | IntegerType | LongType | ByteType | StringType, BinaryType) => input.getBase.asByteList(true) @@ -1050,16 +1070,25 @@ case class GpuCast( } withResource(checkedInput) { checked => + val targetType = DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale) // If target scale reaches DECIMAL64_MAX_PRECISION, container DECIMAL can not // be created because of precision overflow. In this case, we perform casting op directly. - if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { - checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + val casted = if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { + checked.castTo(targetType) } else { val containerType = DType.create(DType.DTypeEnum.DECIMAL64, -(dt.scale + 1)) withResource(checked.castTo(containerType)) { container => container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) } } + // Cast NaN values to nulls + withResource(casted) { casted => + withResource(input.isNan) { inputIsNan => + withResource(Scalar.fromNull(targetType)) { nullScalar => + inputIsNan.ifElse(nullScalar, casted) + } + } + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 4d91f46aece..0f19b654ae6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -563,6 +563,18 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val ENABLE_CAST_STRING_TO_DECIMAL = conf("spark.rapids.sql.castStringToDecimal.enabled") + .doc("When set to true, enables casting from strings to decimal type on the GPU. Currently " + + "string to decimal type on the GPU might produce results which slightly differed from the " + + "correct results when the string represents any number exceeding the max precision that " + + "CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given " + + "input string \"99999999999999999\". The cause of divergence is that we can not cast " + + "strings containing scientific notation to decimal directly. So, we have to cast strings " + + "to floats firstly. Then, cast floats to decimals. The first step may lead to precision " + + "loss.") + .booleanConf + .createWithDefault(false) + val ENABLE_CAST_STRING_TO_TIMESTAMP = conf("spark.rapids.sql.castStringToTimestamp.enabled") .doc("When set to true, casting from string to timestamp is supported on the GPU. The GPU " + "only supports a subset of formats when casting strings to timestamps. Refer to the CAST " + @@ -1177,6 +1189,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCastStringToFloatEnabled: Boolean = get(ENABLE_CAST_STRING_TO_FLOAT) + lazy val isCastStringToDecimalEnabled: Boolean = get(ENABLE_CAST_STRING_TO_DECIMAL) + lazy val isCastFloatToIntegralTypesEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES) lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 4a98c089e33..bfd8a6b9b94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -748,7 +748,7 @@ class CastChecks extends ExprChecks { val timestampChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING val sparkTimestampSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING - val stringChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY + val stringChecks: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY val sparkStringSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + CALENDAR + STRING + BINARY val binaryChecks: TypeSig = none diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 667f4f1be47..b86c16da5f7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -439,6 +439,16 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("cast float to decimal (include NaN/INF/-INF)") { + def floatsIncludeNaNs(ss: SparkSession): DataFrame = { + mixedFloatDf(ss).select(col("floats").as("col")) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.FloatType, scale, + customDataGenerator = Some(floatsIncludeNaNs)) + } + } + test("cast double to decimal") { List(-18, -10, -3, 0, 1, 5, 15).foreach { scale => testCastToDecimal(DataTypes.DoubleType, scale, @@ -446,6 +456,16 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("cast double to decimal (include NaN/INF/-INF)") { + def doublesIncludeNaNs(ss: SparkSession): DataFrame = { + mixedDoubleDf(ss).select(col("doubles").as("col")) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.DoubleType, scale, + customDataGenerator = Some(doublesIncludeNaNs)) + } + } + test("cast decimal to decimal") { // fromScale == toScale testCastToDecimal(DataTypes.createDecimalType(18, 0), @@ -574,6 +594,53 @@ class CastOpSuite extends GpuExpressionTestSuite { generator = decimalGenerator(Seq(Decimal(100000000L)), decType)) } + test("cast string to decimal") { + List(-18, -10, -3, 0, 1, 5, 15).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale, + customRandGenerator = Some(new scala.util.Random(1234L))) + } + } + + test("cast string to decimal (include NaN/INF/-INF)") { + def doubleStrings(ss: SparkSession): DataFrame = { + val df1 = floatsAsStrings(ss).selectExpr("cast(c0 as Double) as col") + val df2 = doublesAsStrings(ss).select(col("c0").as("col")) + df1.unionAll(df2) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale = scale, + customDataGenerator = Some(doubleStrings)) + } + } + + test("cast string to decimal (truncated cases)") { + def specialGenerator(column: Seq[String])(ss: SparkSession): DataFrame = { + import ss.sqlContext.implicits._ + column.toDF("col") + } + testCastToDecimal(DataTypes.StringType, scale = 7, + customDataGenerator = Some(specialGenerator(Seq("9999999999")))) + testCastToDecimal(DataTypes.StringType, scale = 2, + customDataGenerator = Some(specialGenerator(Seq("999999999999999")))) + testCastToDecimal(DataTypes.StringType, scale = 0, + customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) + testCastToDecimal(DataTypes.StringType, scale = -1, + customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) + testCastToDecimal(DataTypes.StringType, scale = -10, + customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) + } + + test("ansi_cast string to decimal exp") { + def exponentsAsStrings(ss: SparkSession): DataFrame = { + exponentsAsStringsDf(ss).select(col("c0").as("col")) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale = scale, + customDataGenerator = Some(exponentsAsStrings), + ansiEnabled = true) + } + } + protected def testCastToDecimal( dataType: DataType, scale: Int, @@ -592,13 +659,14 @@ class CastOpSuite extends GpuExpressionTestSuite { val conf = new SparkConf() .set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true") .set(RapidsConf.ENABLE_CAST_FLOAT_TO_DECIMAL.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_DECIMAL.key, "true") .set("spark.rapids.sql.exec.FileSourceScanExec", "false") .set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true") .set("spark.sql.ansi.enabled", ansiEnabled.toString) val defaultRandomGenerator: SparkSession => DataFrame = { val rnd = customRandGenerator.getOrElse(new scala.util.Random(1234L)) - generateCastNumericToDecimalDataFrame(dataType, precision - scale, rnd, 500) + generateCastToDecimalDataFrame(dataType, precision - scale, rnd, 500) } val generator = customDataGenerator.getOrElse(defaultRandomGenerator) withCpuSparkSession(spark => generator(spark).write.parquet(path), conf) @@ -613,7 +681,7 @@ class CastOpSuite extends GpuExpressionTestSuite { val (cpuResult, gpuResult) = dataType match { case ShortType | IntegerType | LongType | _: DecimalType => fromCpu.map(r => Row(r.getDecimal(1))) -> fromGpu.map(r => Row(r.getDecimal(1))) - case FloatType | DoubleType => + case FloatType | DoubleType | StringType => // There may be tiny difference between CPU and GPU result when casting from double val fetchFromRow = (r: Row) => { if (r.isNullAt(1)) Double.NaN @@ -630,7 +698,7 @@ class CastOpSuite extends GpuExpressionTestSuite { } } - private def generateCastNumericToDecimalDataFrame( + private def generateCastToDecimalDataFrame( dataType: DataType, integralSize: Int, rndGenerator: scala.util.Random, @@ -656,7 +724,7 @@ class CastOpSuite extends GpuExpressionTestSuite { enhancedRnd.nextLong() / math.pow(10, scale max 9).toLong case LongType => enhancedRnd.nextLong() / math.pow(10, scale max 0).toLong - case FloatType | DoubleType => + case FloatType | DoubleType | StringType => enhancedRnd.nextLong() / math.pow(10, scale + 2) case dt: DecimalType => val unscaledValue = (enhancedRnd.nextLong() * math.pow(10, dt.precision - 18)).toLong @@ -676,6 +744,8 @@ class CastOpSuite extends GpuExpressionTestSuite { rawColumn.map(_.asInstanceOf[Double].toFloat).toDF("col") case DoubleType => rawColumn.map(_.asInstanceOf[Double]).toDF("col") + case StringType => + rawColumn.map(_.asInstanceOf[Double].toString).toDF("col") case dt: DecimalType => val row = rawColumn.map(e => Row(e.asInstanceOf[Decimal])).asJava ss.createDataFrame(row, StructType(Seq(StructField("col", dt))))