Skip to content

Commit

Permalink
support casting string to decimal (NVIDIA#1999)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx authored Mar 25, 2021
1 parent 8442066 commit 52e7cf4
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Name | Description | Default Value
<a name="sql.castFloatToDecimal.enabled"></a>spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|false
<a name="sql.castFloatToIntegralTypes.enabled"></a>spark.rapids.sql.castFloatToIntegralTypes.enabled|Casting from floating point types to integral types on the GPU supports a slightly different range of values when using Spark 3.1.0 or later. Refer to the CAST documentation for more details.|false
<a name="sql.castFloatToString.enabled"></a>spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|false
<a name="sql.castStringToDecimal.enabled"></a>spark.rapids.sql.castStringToDecimal.enabled|When set to true, enables casting from strings to decimal type on the GPU. Currently string to decimal type on the GPU might produce results which slightly differed from the correct results when the string represents any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given input string "99999999999999999". The cause of divergence is that we can not cast strings containing scientific notation to decimal directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. The first step may lead to precision loss.|false
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToInteger.enabled"></a>spark.rapids.sql.castStringToInteger.enabled|When set to true, enables casting from strings to integer types (byte, short, int, long) on the GPU. Casting from string to integer types on the GPU returns incorrect results when the string represents a number larger than Long.MaxValue or smaller than Long.MinValue.|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -18029,7 +18029,7 @@ and the accelerator produces the same result.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td> </td>
<td>S</td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -18433,7 +18433,7 @@ and the accelerator produces the same result.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td> </td>
<td>S</td>
<td><b>NS</b></td>
Expand Down
33 changes: 31 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class CastExprMeta[INPUT <: CastBase](
"for more details. To enable this operation on the GPU, set" +
s" ${RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP} to true.")
}
// FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019
if (!conf.isCastStringToDecimalEnabled && cast.child.dataType == DataTypes.StringType &&
cast.dataType.isInstanceOf[DecimalType]) {
willNotWorkOnGpu("Currently string to decimal type on the GPU might produce results which " +
"slightly differed from the correct results when the string represents any number " +
"exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU " +
"returns 99999999999999987 given input string \"99999999999999999\". The cause of " +
"divergence is that we can not cast strings containing scientific notation to decimal " +
"directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. " +
"The first step may lead to precision loss. To enable this operation on the GPU, set " +
s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
}
}

def buildTagMessage(entry: ConfEntry[_]): String = {
Expand Down Expand Up @@ -387,6 +399,14 @@ case class GpuCast(
}
}
}
case (StringType, dt: DecimalType) =>
// To apply HALF_UP rounding strategy during casting to decimal, we firstly cast
// string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding.
withResource(input.getBase.strip()) { trimmed =>
withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp =>
castFloatsToDecimal(fp, dt)
}
}

case (ShortType | IntegerType | LongType | ByteType | StringType, BinaryType) =>
input.getBase.asByteList(true)
Expand Down Expand Up @@ -1050,16 +1070,25 @@ case class GpuCast(
}

withResource(checkedInput) { checked =>
val targetType = DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)
// If target scale reaches DECIMAL64_MAX_PRECISION, container DECIMAL can not
// be created because of precision overflow. In this case, we perform casting op directly.
if (DType.DECIMAL64_MAX_PRECISION == dt.scale) {
checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale))
val casted = if (DType.DECIMAL64_MAX_PRECISION == dt.scale) {
checked.castTo(targetType)
} else {
val containerType = DType.create(DType.DTypeEnum.DECIMAL64, -(dt.scale + 1))
withResource(checked.castTo(containerType)) { container =>
container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP)
}
}
// Cast NaN values to nulls
withResource(casted) { casted =>
withResource(input.isNan) { inputIsNan =>
withResource(Scalar.fromNull(targetType)) { nullScalar =>
inputIsNan.ifElse(nullScalar, casted)
}
}
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,18 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_CAST_STRING_TO_DECIMAL = conf("spark.rapids.sql.castStringToDecimal.enabled")
.doc("When set to true, enables casting from strings to decimal type on the GPU. Currently " +
"string to decimal type on the GPU might produce results which slightly differed from the " +
"correct results when the string represents any number exceeding the max precision that " +
"CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given " +
"input string \"99999999999999999\". The cause of divergence is that we can not cast " +
"strings containing scientific notation to decimal directly. So, we have to cast strings " +
"to floats firstly. Then, cast floats to decimals. The first step may lead to precision " +
"loss.")
.booleanConf
.createWithDefault(false)

val ENABLE_CAST_STRING_TO_TIMESTAMP = conf("spark.rapids.sql.castStringToTimestamp.enabled")
.doc("When set to true, casting from string to timestamp is supported on the GPU. The GPU " +
"only supports a subset of formats when casting strings to timestamps. Refer to the CAST " +
Expand Down Expand Up @@ -1177,6 +1189,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCastStringToFloatEnabled: Boolean = get(ENABLE_CAST_STRING_TO_FLOAT)

lazy val isCastStringToDecimalEnabled: Boolean = get(ENABLE_CAST_STRING_TO_DECIMAL)

lazy val isCastFloatToIntegralTypesEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES)

lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ class CastChecks extends ExprChecks {
val timestampChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING
val sparkTimestampSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING

val stringChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY
val stringChecks: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY
val sparkStringSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + CALENDAR + STRING + BINARY

val binaryChecks: TypeSig = none
Expand Down
78 changes: 74 additions & 4 deletions tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,33 @@ class CastOpSuite extends GpuExpressionTestSuite {
}
}

test("cast float to decimal (include NaN/INF/-INF)") {
def floatsIncludeNaNs(ss: SparkSession): DataFrame = {
mixedFloatDf(ss).select(col("floats").as("col"))
}
List(-10, -1, 0, 1, 10).foreach { scale =>
testCastToDecimal(DataTypes.FloatType, scale,
customDataGenerator = Some(floatsIncludeNaNs))
}
}

test("cast double to decimal") {
List(-18, -10, -3, 0, 1, 5, 15).foreach { scale =>
testCastToDecimal(DataTypes.DoubleType, scale,
customRandGenerator = Some(new scala.util.Random(1234L)))
}
}

test("cast double to decimal (include NaN/INF/-INF)") {
def doublesIncludeNaNs(ss: SparkSession): DataFrame = {
mixedDoubleDf(ss).select(col("doubles").as("col"))
}
List(-10, -1, 0, 1, 10).foreach { scale =>
testCastToDecimal(DataTypes.DoubleType, scale,
customDataGenerator = Some(doublesIncludeNaNs))
}
}

test("cast decimal to decimal") {
// fromScale == toScale
testCastToDecimal(DataTypes.createDecimalType(18, 0),
Expand Down Expand Up @@ -574,6 +594,53 @@ class CastOpSuite extends GpuExpressionTestSuite {
generator = decimalGenerator(Seq(Decimal(100000000L)), decType))
}

test("cast string to decimal") {
List(-18, -10, -3, 0, 1, 5, 15).foreach { scale =>
testCastToDecimal(DataTypes.StringType, scale,
customRandGenerator = Some(new scala.util.Random(1234L)))
}
}

test("cast string to decimal (include NaN/INF/-INF)") {
def doubleStrings(ss: SparkSession): DataFrame = {
val df1 = floatsAsStrings(ss).selectExpr("cast(c0 as Double) as col")
val df2 = doublesAsStrings(ss).select(col("c0").as("col"))
df1.unionAll(df2)
}
List(-10, -1, 0, 1, 10).foreach { scale =>
testCastToDecimal(DataTypes.StringType, scale = scale,
customDataGenerator = Some(doubleStrings))
}
}

test("cast string to decimal (truncated cases)") {
def specialGenerator(column: Seq[String])(ss: SparkSession): DataFrame = {
import ss.sqlContext.implicits._
column.toDF("col")
}
testCastToDecimal(DataTypes.StringType, scale = 7,
customDataGenerator = Some(specialGenerator(Seq("9999999999"))))
testCastToDecimal(DataTypes.StringType, scale = 2,
customDataGenerator = Some(specialGenerator(Seq("999999999999999"))))
testCastToDecimal(DataTypes.StringType, scale = 0,
customDataGenerator = Some(specialGenerator(Seq("99999999999999999"))))
testCastToDecimal(DataTypes.StringType, scale = -1,
customDataGenerator = Some(specialGenerator(Seq("99999999999999999"))))
testCastToDecimal(DataTypes.StringType, scale = -10,
customDataGenerator = Some(specialGenerator(Seq("99999999999999999"))))
}

test("ansi_cast string to decimal exp") {
def exponentsAsStrings(ss: SparkSession): DataFrame = {
exponentsAsStringsDf(ss).select(col("c0").as("col"))
}
List(-10, -1, 0, 1, 10).foreach { scale =>
testCastToDecimal(DataTypes.StringType, scale = scale,
customDataGenerator = Some(exponentsAsStrings),
ansiEnabled = true)
}
}

protected def testCastToDecimal(
dataType: DataType,
scale: Int,
Expand All @@ -592,13 +659,14 @@ class CastOpSuite extends GpuExpressionTestSuite {
val conf = new SparkConf()
.set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true")
.set(RapidsConf.ENABLE_CAST_FLOAT_TO_DECIMAL.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_DECIMAL.key, "true")
.set("spark.rapids.sql.exec.FileSourceScanExec", "false")
.set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")
.set("spark.sql.ansi.enabled", ansiEnabled.toString)

val defaultRandomGenerator: SparkSession => DataFrame = {
val rnd = customRandGenerator.getOrElse(new scala.util.Random(1234L))
generateCastNumericToDecimalDataFrame(dataType, precision - scale, rnd, 500)
generateCastToDecimalDataFrame(dataType, precision - scale, rnd, 500)
}
val generator = customDataGenerator.getOrElse(defaultRandomGenerator)
withCpuSparkSession(spark => generator(spark).write.parquet(path), conf)
Expand All @@ -613,7 +681,7 @@ class CastOpSuite extends GpuExpressionTestSuite {
val (cpuResult, gpuResult) = dataType match {
case ShortType | IntegerType | LongType | _: DecimalType =>
fromCpu.map(r => Row(r.getDecimal(1))) -> fromGpu.map(r => Row(r.getDecimal(1)))
case FloatType | DoubleType =>
case FloatType | DoubleType | StringType =>
// There may be tiny difference between CPU and GPU result when casting from double
val fetchFromRow = (r: Row) => {
if (r.isNullAt(1)) Double.NaN
Expand All @@ -630,7 +698,7 @@ class CastOpSuite extends GpuExpressionTestSuite {
}
}

private def generateCastNumericToDecimalDataFrame(
private def generateCastToDecimalDataFrame(
dataType: DataType,
integralSize: Int,
rndGenerator: scala.util.Random,
Expand All @@ -656,7 +724,7 @@ class CastOpSuite extends GpuExpressionTestSuite {
enhancedRnd.nextLong() / math.pow(10, scale max 9).toLong
case LongType =>
enhancedRnd.nextLong() / math.pow(10, scale max 0).toLong
case FloatType | DoubleType =>
case FloatType | DoubleType | StringType =>
enhancedRnd.nextLong() / math.pow(10, scale + 2)
case dt: DecimalType =>
val unscaledValue = (enhancedRnd.nextLong() * math.pow(10, dt.precision - 18)).toLong
Expand All @@ -676,6 +744,8 @@ class CastOpSuite extends GpuExpressionTestSuite {
rawColumn.map(_.asInstanceOf[Double].toFloat).toDF("col")
case DoubleType =>
rawColumn.map(_.asInstanceOf[Double]).toDF("col")
case StringType =>
rawColumn.map(_.asInstanceOf[Double].toString).toDF("col")
case dt: DecimalType =>
val row = rawColumn.map(e => Row(e.asInstanceOf[Decimal])).asJava
ss.createDataFrame(row, StructType(Seq(StructField("col", dt))))
Expand Down

0 comments on commit 52e7cf4

Please sign in to comment.