diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index ed56fe63359..25973533021 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -20587,7 +20587,7 @@ and the accelerator produces the same result.
+PS The array's child type must also support being cast to the desired child type; max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP; missing nested CALENDAR, UDT |
@@ -20609,7 +20609,7 @@ and the accelerator produces the same result.
-NS |
+PS the map's key and value must also support being cast to the desired child types; max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP; missing nested CALENDAR, UDT |
@@ -20631,7 +20631,7 @@ and the accelerator produces the same result.
-NS |
+PS the struct's children must also support being cast to the desired child type(s); max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP; missing nested CALENDAR, UDT |
@@ -20991,7 +20991,7 @@ and the accelerator produces the same result.
+PS The array's child type must also support being cast to the desired child type; max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP; missing nested CALENDAR, UDT |
@@ -21013,7 +21013,7 @@ and the accelerator produces the same result.
-NS |
+PS the map's key and value must also support being cast to the desired child types; max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP; missing nested CALENDAR, UDT |
@@ -21035,7 +21035,7 @@ and the accelerator produces the same result.
-NS |
+PS the struct's children must also support being cast to the desired child type(s); max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP; missing nested CALENDAR, UDT |
diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py
index 298918ed4b4..655db6e9353 100644
--- a/integration_tests/src/main/python/array_test.py
+++ b/integration_tests/src/main/python/array_test.py
@@ -165,53 +165,3 @@ def test_array_element_at_all_null_ansi_not_fail(data_gen):
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
-@pytest.mark.parametrize('child_gen', [
- float_gen,
- double_gen,
- int_gen
-], ids=idfn)
-@pytest.mark.parametrize('child_to_type', [
- FloatType(),
- DoubleType(),
- IntegerType(),
-], ids=idfn)
-@pytest.mark.parametrize('depth', [1, 2, 3], ids=idfn)
-def test_array_cast_recursive(child_gen, child_to_type, depth):
- def cast_func(spark):
- depth_rng = range(0, depth)
- nested_gen = reduce(lambda dg, i: ArrayGen(dg, max_length=int(max(1, 16 / (2 ** i)))),
- depth_rng, child_gen)
- nested_type = reduce(lambda t, _: ArrayType(t), depth_rng, child_to_type)
- df = two_col_df(spark, int_gen, nested_gen)
- res = df.select(df.b.cast(nested_type))
- return res
- assert_gpu_and_cpu_are_equal_collect(cast_func)
-@allow_non_gpu('ProjectExec', 'Alias', 'Cast')
-def test_array_cast_fallback():
- def cast_float_to_double(spark):
- df = two_col_df(spark, int_gen, ArrayGen(int_gen))
- res = df.select(df.b.cast(ArrayType(StringType())))
- return res
- assert_gpu_and_cpu_are_equal_collect(cast_float_to_double)
-@pytest.mark.parametrize('child_gen', [
- byte_gen,
- string_gen,
- decimal_gen_default,
-], ids=idfn)
-@pytest.mark.parametrize('child_to_type', [
- FloatType(),
- DoubleType(),
- IntegerType(),
-], ids=idfn)
-@allow_non_gpu('ProjectExec', 'Alias', 'Cast')
-def test_array_cast_bad_from_good_to_fallback(child_gen, child_to_type):
- def cast_array(spark):
- df = two_col_df(spark, int_gen, ArrayGen(child_gen))
- res = df.select(df.b.cast(ArrayType(child_to_type)))
- return res
- assert_gpu_and_cpu_are_equal_collect(cast_array)
\ No newline at end of file
diff --git a/integration_tests/src/main/python/cast_test.py b/integration_tests/src/main/python/cast_test.py
index d04066f4cb2..3e2f28ca1f7 100644
--- a/integration_tests/src/main/python/cast_test.py
+++ b/integration_tests/src/main/python/cast_test.py
@@ -30,3 +30,26 @@ def test_cast_empty_string_to_int():
'CAST(a as LONG)'))
+# These tests are not intended to be exhaustive. The scala test CastOpSuite should cover
+# just about everything for non-nested values. This is intended to check that the
+# recursive code in nested type checks, like arrays, is working properly. So we are going
+# pick child types that are simple to cast. Upcasting integer values and casting them to strings
+@pytest.mark.parametrize('data_gen,to_type', [
+ (ArrayGen(byte_gen), ArrayType(IntegerType())),
+ (ArrayGen(StringGen('[0-9]{1,5}')), ArrayType(IntegerType())),
+ (ArrayGen(byte_gen), ArrayType(StringType())),
+ (ArrayGen(byte_gen), ArrayType(DecimalType(6, 2))),
+ (ArrayGen(ArrayGen(byte_gen)), ArrayType(ArrayType(IntegerType()))),
+ (ArrayGen(ArrayGen(byte_gen)), ArrayType(ArrayType(StringType()))),
+ (ArrayGen(ArrayGen(byte_gen)), ArrayType(ArrayType(DecimalType(6, 2)))),
+ (StructGen([('a', byte_gen)]), StructType([StructField('a', IntegerType())])),
+ (StructGen([('a', byte_gen), ('c', short_gen)]), StructType([StructField('b', IntegerType()), StructField('c', ShortType())])),
+ (StructGen([('a', ArrayGen(byte_gen)), ('c', short_gen)]), StructType([StructField('a', ArrayType(IntegerType())), StructField('c', LongType())])),
+ (ArrayGen(StructGen([('a', byte_gen), ('b', byte_gen)])), ArrayType(StringType())),
+ (MapGen(ByteGen(nullable=False), byte_gen), MapType(StringType(), StringType())),
+ (MapGen(ShortGen(nullable=False), ArrayGen(byte_gen)), MapType(IntegerType(), ArrayType(ShortType()))),
+ (MapGen(ShortGen(nullable=False), ArrayGen(StructGen([('a', byte_gen)]))), MapType(IntegerType(), ArrayType(StructType([StructField('b', ShortType())]))))
+ ], ids=idfn)
+def test_cast_nested(data_gen, to_type):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type)))
diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala
index 44033dc8ae7..a4ad64b48ba 100644
--- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala
+++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala
@@ -147,13 +147,22 @@ class Spark311Shims extends Spark301Shims {
// calendarChecks are the same
- override val arrayChecks: TypeSig = none
+ override val arrayChecks: TypeSig =
+ ARRAY.nested(commonCudfTypes + DECIMAL_64 + NULL + ARRAY + BINARY + STRUCT) +
+ psNote(TypeEnum.ARRAY, "The array's child type must also support being cast to " +
+ "the desired child type")
override val sparkArraySig: TypeSig = ARRAY.nested(all)
- override val mapChecks: TypeSig = none
+ override val mapChecks: TypeSig =
+ MAP.nested(commonCudfTypes + DECIMAL_64 + NULL + ARRAY + BINARY + STRUCT + MAP) +
+ psNote(TypeEnum.MAP, "the map's key and value must also support being cast to the " +
+ "desired child types")
override val sparkMapSig: TypeSig = MAP.nested(all)
- override val structChecks: TypeSig = none
+ override val structChecks: TypeSig =
+ STRUCT.nested(commonCudfTypes + DECIMAL_64 + NULL + ARRAY + BINARY + STRUCT) +
+ psNote(TypeEnum.STRUCT, "the struct's children must also support being cast to the " +
+ "desired child type(s)")
override val sparkStructSig: TypeSig = STRUCT.nested(all)
override val udtChecks: TypeSig = none
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
index 561448c16e7..a574b0124cd 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
@@ -39,9 +39,9 @@ class CastExprMeta[INPUT <: CastBase](
rule: DataFromReplacementRule)
extends UnaryExprMeta[INPUT](cast, conf, parent, rule) {
- val fromType = cast.child.dataType
- val toType = cast.dataType
- var legacyCastToString = ShimLoader.getSparkShims.getLegacyComplexTypeToString()
+ val fromType: DataType = cast.child.dataType
+ val toType: DataType = cast.dataType
+ val legacyCastToString: Boolean = ShimLoader.getSparkShims.getLegacyComplexTypeToString()
override def tagExprForGpu(): Unit = recursiveTagExprForGpuCheck()
@@ -49,80 +49,58 @@ class CastExprMeta[INPUT <: CastBase](
fromDataType: DataType = fromType,
toDataType: DataType = toType)
: Unit = {
- if (!conf.isCastFloatToDecimalEnabled && toDataType.isInstanceOf[DecimalType] &&
- (fromDataType == DataTypes.FloatType || fromDataType == DataTypes.DoubleType)) {
- willNotWorkOnGpu("the GPU will use a different strategy from Java's BigDecimal to convert " +
- "floating point data types to decimals and this can produce results that slightly " +
- "differ from the default behavior in Spark. To enable this operation on the GPU, set " +
- s"${RapidsConf.ENABLE_CAST_FLOAT_TO_DECIMAL} to true.")
- }
- if (!conf.isCastFloatToStringEnabled && toDataType == DataTypes.StringType &&
- (fromDataType == DataTypes.FloatType || fromDataType == DataTypes.DoubleType)) {
- willNotWorkOnGpu("the GPU will use different precision than Java's toString method when " +
- "converting floating point data types to strings and this can produce results that " +
- "differ from the default behavior in Spark. To enable this operation on the GPU, set" +
- s" ${RapidsConf.ENABLE_CAST_FLOAT_TO_STRING} to true.")
- }
- if (!conf.isCastStringToFloatEnabled && cast.child.dataType == DataTypes.StringType &&
- Seq(DataTypes.FloatType, DataTypes.DoubleType).contains(cast.dataType)) {
- willNotWorkOnGpu("Currently hex values aren't supported on the GPU. Also note " +
- "that casting from string to float types on the GPU returns incorrect results when the " +
- "string represents any number \"1.7976931348623158E308\" <= x < " +
- "\"1.7976931348623159E308\" and \"-1.7976931348623159E308\" < x <= " +
- "\"-1.7976931348623158E308\" in both these cases the GPU returns Double.MaxValue while " +
- "CPU returns \"+Infinity\" and \"-Infinity\" respectively. To enable this operation on " +
- "the GPU, set" + s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
- }
- if (!conf.isCastStringToTimestampEnabled && fromDataType == DataTypes.StringType
- && toDataType == DataTypes.TimestampType) {
- willNotWorkOnGpu("the GPU only supports a subset of formats " +
- "when casting strings to timestamps. Refer to the CAST documentation " +
- "for more details. To enable this operation on the GPU, set" +
- s" ${RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP} to true.")
- }
- // FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019
- if (!conf.isCastStringToDecimalEnabled && cast.child.dataType == DataTypes.StringType &&
- cast.dataType.isInstanceOf[DecimalType]) {
- willNotWorkOnGpu("Currently string to decimal type on the GPU might produce results which " +
- "slightly differed from the correct results when the string represents any number " +
- "exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU " +
- "returns 99999999999999987 given input string \"99999999999999999\". The cause of " +
- "divergence is that we can not cast strings containing scientific notation to decimal " +
- "directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. " +
- "The first step may lead to precision loss. To enable this operation on the GPU, set " +
- s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
- }
- if (fromDataType.isInstanceOf[StructType]) {
- val checks = rule.getChecks.get.asInstanceOf[CastChecks]
- fromDataType.asInstanceOf[StructType].foreach{field =>
- recursiveTagExprForGpuCheck(field.dataType)
- if (toDataType == StringType) {
- if (!checks.gpuCanCast(field.dataType, toDataType)) {
- willNotWorkOnGpu(s"Unsupported type ${field.dataType} found in Struct column. " +
- s"Casting ${field.dataType} to ${toDataType} not currently supported. Refer to " +
- "CAST documentation for more details.")
- }
+ (fromDataType, toDataType) match {
+ case (_: FloatType | _: DoubleType, _: DecimalType) if !conf.isCastFloatToDecimalEnabled =>
+ willNotWorkOnGpu("the GPU will use a different strategy from Java's BigDecimal " +
+ "to convert floating point data types to decimals and this can produce results that " +
+ "slightly differ from the default behavior in Spark. To enable this operation on " +
+ s"the GPU, set ${RapidsConf.ENABLE_CAST_FLOAT_TO_DECIMAL} to true.")
+ case (_: FloatType | _: DoubleType, _: StringType) if !conf.isCastFloatToStringEnabled =>
+ willNotWorkOnGpu("the GPU will use different precision than Java's toString method when " +
+ "converting floating point data types to strings and this can produce results that " +
+ "differ from the default behavior in Spark. To enable this operation on the GPU, set" +
+ s" ${RapidsConf.ENABLE_CAST_FLOAT_TO_STRING} to true.")
+ case (_: StringType, _: FloatType | _: DoubleType) if !conf.isCastStringToFloatEnabled =>
+ willNotWorkOnGpu("Currently hex values aren't supported on the GPU. Also note " +
+ "that casting from string to float types on the GPU returns incorrect results when " +
+ "the string represents any number \"1.7976931348623158E308\" <= x < " +
+ "\"1.7976931348623159E308\" and \"-1.7976931348623159E308\" < x <= " +
+ "\"-1.7976931348623158E308\" in both these cases the GPU returns Double.MaxValue " +
+ "while CPU returns \"+Infinity\" and \"-Infinity\" respectively. To enable this " +
+ s"operation on the GPU, set ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
+ case (_: StringType, _: TimestampType) if !conf.isCastStringToTimestampEnabled =>
+ willNotWorkOnGpu("the GPU only supports a subset of formats " +
+ "when casting strings to timestamps. Refer to the CAST documentation " +
+ "for more details. To enable this operation on the GPU, set" +
+ s" ${RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP} to true.")
+ case (_: StringType, _: DecimalType) if !conf.isCastStringToDecimalEnabled =>
+ // FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019
+ willNotWorkOnGpu("Currently string to decimal type on the GPU might produce " +
+ "results which slightly differed from the correct results when the string represents " +
+ "any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For " +
+ "instance, the GPU returns 99999999999999987 given input string " +
+ "\"99999999999999999\". The cause of divergence is that we can not cast strings " +
+ "containing scientific notation to decimal directly. So, we have to cast strings " +
+ "to floats firstly. Then, cast floats to decimals. The first step may lead to " +
+ "precision loss. To enable this operation on the GPU, set " +
+ s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
+ case (structType: StructType, StringType) =>
+ structType.foreach { field =>
+ recursiveTagExprForGpuCheck(field.dataType, StringType)
- }
- }
+ case (fromStructType: StructType, toStructType: StructType) =>
+ fromStructType.zip(toStructType).foreach {
+ case (fromChild, toChild) =>
+ recursiveTagExprForGpuCheck(fromChild.dataType, toChild.dataType)
+ }
+ case (ArrayType(nestedFrom, _), ArrayType(nestedTo, _)) =>
+ recursiveTagExprForGpuCheck(nestedFrom, nestedTo)
- (fromDataType, toDataType) match {
- case (
- ArrayType(nestedFrom@(
- FloatType |
- DoubleType |
- IntegerType |
- ArrayType(_, _)), _),
- ArrayType(nestedTo@(
- FloatType |
- DoubleType |
- IntegerType |
- ArrayType(_, _)), _)) => recursiveTagExprForGpuCheck(nestedFrom, nestedTo)
- case (nestedFrom@ArrayType(_, _), nestedTo@ArrayType(_, _)) =>
- willNotWorkOnGpu(s"casting from $nestedFrom to $nestedTo is not supported")
- case _ => ()
+ case (MapType(keyFrom, valueFrom, _), MapType(keyTo, valueTo, _)) =>
+ recursiveTagExprForGpuCheck(keyFrom, keyTo)
+ recursiveTagExprForGpuCheck(valueFrom, valueTo)
+ case _ =>
@@ -155,10 +133,10 @@ object GpuCast extends Arm {
"[0-9]{2}:[0-9]{2}:[0-9]{2})" +
- val INVALID_INPUT_MESSAGE = "Column contains at least one value that is not in the " +
+ val INVALID_INPUT_MESSAGE: String = "Column contains at least one value that is not in the " +
"required range"
- val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number"
+ val INVALID_FLOAT_CAST_MSG: String = "At least one value is either null or is an invalid number"
* Sanitization step for CAST string to date. Inserts a single zero before any
@@ -307,101 +285,29 @@ object GpuCast extends Arm {
- * Casts using the GPU
- */
-case class GpuCast(
- child: Expression,
- dataType: DataType,
- ansiMode: Boolean = false,
- timeZoneId: Option[String] = None,
- legacyCastToString: Boolean = false)
- extends GpuUnaryExpression with TimeZoneAwareExpression with NullIntolerant {
- import GpuCast._
- override def toString: String = if (ansiMode) {
- s"ansi_cast($child as ${dataType.simpleString})"
- } else {
- s"cast($child as ${dataType.simpleString})"
- }
- override def checkInputDataTypes(): TypeCheckResult = {
- if (Cast.canCast(child.dataType, dataType)) {
- TypeCheckResult.TypeCheckSuccess
- } else {
- TypeCheckResult.TypeCheckFailure(
- s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}")
- }
- }
- override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable
- override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
- copy(timeZoneId = Option(timeZoneId))
- /**
- * Under certain conditions during hash partitioning, Spark will attempt to replace casts
- * with semantically equivalent expressions. This method is overridden to prevent Spark
- * from substituting non-GPU expressions.
- */
- override def semanticEquals(other: Expression): Boolean = other match {
- case g: GpuExpression =>
- if (this == g) {
- true
- } else {
- super.semanticEquals(g)
- }
- case _ => false
- }
- // When this cast involves TimeZone, it's only resolved if the timeZoneId is set;
- // Otherwise behave like Expression.resolved.
- override lazy val resolved: Boolean =
- childrenResolved && checkInputDataTypes().isSuccess && (!needsTimeZone || timeZoneId.isDefined)
- private[this] def needsTimeZone: Boolean = Cast.needsTimeZone(child.dataType, dataType)
- override def sql: String = dataType match {
- // HiveQL doesn't allow casting to complex types. For logical plans translated from HiveQL,
- // this type of casting can only be introduced by the analyzer, and can be omitted when
- // converting back to SQL query string.
- case _: ArrayType | _: MapType | _: StructType => child.sql
- case _ => s"CAST(${child.sql} AS ${dataType.sql})"
- }
- override def doColumnar(input: GpuColumnVector): ColumnVector = {
- (input.dataType(), dataType) match {
- // Filter out casts to Decimal that utilize the ColumnVector to avoid a copy
- case (ShortType | IntegerType | LongType, dt: DecimalType) =>
- castIntegralsToDecimal(input.getBase, dt)
- case (FloatType | DoubleType, dt: DecimalType) =>
- castFloatsToDecimal(input.getBase, dt)
- case (from: DecimalType, to: DecimalType) =>
- castDecimalToDecimal(input.getBase, from, to)
- case _ =>
- recursiveDoColumnar(input.getBase, input.dataType())
- }
- }
private def recursiveDoColumnar(
input: ColumnView,
fromDataType: DataType,
- toDataType: DataType = dataType)
- : ColumnVector = {
+ toDataType: DataType,
+ ansiMode: Boolean,
+ legacyCastToString: Boolean): ColumnVector = {
+ if (DataType.equalsStructurally(fromDataType, toDataType)) {
+ return input.copyToColumnVector()
+ }
(fromDataType, toDataType) match {
case (NullType, to) =>
GpuColumnVector.columnVectorFromNull(input.getRowCount.toInt, to)
case (DateType, BooleanType | _: NumericType) =>
// casts from date type to numerics are always null
GpuColumnVector.columnVectorFromNull(input.getRowCount.toInt, toDataType)
case (DateType, StringType) =>
case (TimestampType, FloatType | DoubleType) =>
withResource(input.castTo(DType.INT64)) { asLongs =>
withResource(Scalar.fromDouble(1000000)) { microsPerSec =>
@@ -440,8 +346,9 @@ case class GpuCast(
case (TimestampType, StringType) =>
case (StructType(fields), StringType) =>
- castStructToString(input, fields)
+ castStructToString(input, fields, ansiMode, legacyCastToString)
// ansi cast from larger-than-integer integral types, to integer
case (LongType, IntegerType) if ansiMode =>
@@ -492,7 +399,7 @@ case class GpuCast(
withResource(FloatUtils.infinityToNulls(inputWithNansToNull)) {
inputWithoutNanAndInfinity =>
if (fromDataType == FloatType &&
- ShimLoader.getSparkShims.hasCastFloatTimestampUpcast) {
+ ShimLoader.getSparkShims.hasCastFloatTimestampUpcast) {
withResource(inputWithoutNanAndInfinity.castTo(DType.FLOAT64)) { doubles =>
withResource(doubles.mul(microsPerSec, DType.INT64)) {
inputTimesMicrosCv =>
@@ -508,6 +415,10 @@ case class GpuCast(
+ case (FloatType | DoubleType, dt: DecimalType) =>
+ castFloatsToDecimal(input, dt, ansiMode)
+ case (from: DecimalType, to: DecimalType) =>
+ castDecimalToDecimal(input, from, to, ansiMode)
case (BooleanType, TimestampType) =>
// cudf requires casting to a long first.
withResource(input.castTo(DType.INT64)) { longs =>
@@ -544,7 +455,7 @@ case class GpuCast(
case DateType =>
case TimestampType =>
- castStringToTimestamp(trimmed)
+ castStringToTimestamp(trimmed, ansiMode)
case FloatType | DoubleType =>
castStringToFloats(trimmed, ansiMode,
@@ -558,45 +469,44 @@ case class GpuCast(
// string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding.
withResource(input.strip()) { trimmed =>
withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp =>
- castFloatsToDecimal(fp, dt)
+ castFloatsToDecimal(fp, dt, ansiMode)
+ case (ShortType | IntegerType | LongType, dt: DecimalType) =>
+ castIntegralsToDecimal(input, dt, ansiMode)
case (ShortType | IntegerType | LongType | ByteType | StringType, BinaryType) =>
case (ShortType | IntegerType | LongType, dt: DecimalType) =>
withResource(input.copyToColumnVector()) { inputVector =>
- castIntegralsToDecimal(inputVector, dt)
+ castIntegralsToDecimal(inputVector, dt, ansiMode)
case (FloatType | DoubleType, dt: DecimalType) =>
withResource(input.copyToColumnVector()) { inputVector =>
- castFloatsToDecimal(inputVector, dt)
+ castFloatsToDecimal(inputVector, dt, ansiMode)
case (from: DecimalType, to: DecimalType) =>
- castDecimalToDecimal(input.copyToColumnVector(), from, to)
+ castDecimalToDecimal(input.copyToColumnVector(), from, to, ansiMode)
case (_: DecimalType, StringType) =>
- case (
- ArrayType(nestedFrom@(
- FloatType |
- DoubleType |
- IntegerType |
- ArrayType(_, _)), _),
- ArrayType(nestedTo@(
- FloatType |
- DoubleType |
- IntegerType |
- ArrayType(_, _)), _)) => {
- withResource(input.getChildColumnView(0))(childView =>
- withResource(recursiveDoColumnar(childView, nestedFrom, nestedTo))(childColumnVector =>
- withResource(input.replaceListChild(childColumnVector))(_.copyToColumnVector())))
- }
+ case (ArrayType(nestedFrom, _), ArrayType(nestedTo, _)) =>
+ withResource(input.getChildColumnView(0)) { childView =>
+ withResource(recursiveDoColumnar(childView, nestedFrom, nestedTo,
+ ansiMode, legacyCastToString)) { childColumnVector =>
+ withResource(input.replaceListChild(childColumnVector))(_.copyToColumnVector())
+ }
+ }
+ case (from: StructType, to: StructType) =>
+ castStructToStruct(from, to, input, ansiMode, legacyCastToString)
+ case (from: MapType, to: MapType) =>
+ castMapToMap(from, to, input, ansiMode, legacyCastToString)
case _ =>
@@ -614,10 +524,10 @@ case class GpuCast(
* @throws IllegalStateException if any values in the column are not within the specified range
private def assertValuesInRange(values: ColumnView,
- minValue: => Scalar,
- maxValue: => Scalar,
- inclusiveMin: Boolean = true,
- inclusiveMax: Boolean = true): Unit = {
+ minValue: => Scalar,
+ maxValue: => Scalar,
+ inclusiveMin: Boolean = true,
+ inclusiveMax: Boolean = true): Unit = {
def throwIfAny(cv: ColumnView): Unit = {
withResource(cv) { cv =>
@@ -657,12 +567,12 @@ case class GpuCast(
* @param inclusiveMin Whether the min value is included in the valid range or not
* @param inclusiveMax Whether the max value is included in the valid range or not
- private def replaceOutOfRangeValues(values: ColumnVector,
- minValue: => Scalar,
- maxValue: => Scalar,
- replaceValue: => Scalar,
- inclusiveMin: Boolean = true,
- inclusiveMax: Boolean = true): ColumnVector = {
+ private def replaceOutOfRangeValues(values: ColumnView,
+ minValue: => Scalar,
+ maxValue: => Scalar,
+ replaceValue: => Scalar,
+ inclusiveMin: Boolean = true,
+ inclusiveMax: Boolean = true): ColumnVector = {
withResource(minValue) { minValue =>
withResource(maxValue) { maxValue =>
@@ -697,8 +607,11 @@ case class GpuCast(
- private def castStructToString(input: ColumnView,
- inputSchema: Array[StructField]): ColumnVector = {
+ private def castStructToString(
+ input: ColumnView,
+ inputSchema: Array[StructField],
+ ansiMode: Boolean,
+ legacyCastToString: Boolean): ColumnVector = {
val (leftStr, rightStr) = if (legacyCastToString) ("[", "]") else ("{", "}")
val emptyStr = ""
@@ -709,18 +622,19 @@ case class GpuCast(
val numInputColumns = input.getNumChildren
def doCastStructToString(
- emptyScalar: Scalar,
- nullScalar: Scalar,
- sepColumn: ColumnVector,
- spaceColumn: ColumnVector,
- leftColumn: ColumnVector,
- rightColumn: ColumnVector) = {
+ emptyScalar: Scalar,
+ nullScalar: Scalar,
+ sepColumn: ColumnVector,
+ spaceColumn: ColumnVector,
+ leftColumn: ColumnVector,
+ rightColumn: ColumnVector): ColumnVector = {
withResource(ArrayBuffer.empty[ColumnVector]) { columns =>
// legacy: [firstCol
// 3.1+: {firstCol
columns += leftColumn.incRefCount()
withResource(input.getChildColumnView(0)) { firstColumnView =>
- columns += recursiveDoColumnar(firstColumnView, inputSchema.head.dataType)
+ columns += recursiveDoColumnar(firstColumnView, inputSchema.head.dataType, StringType,
+ ansiMode, legacyCastToString)
for (nonFirstIndex <- 1 until numInputColumns) {
withResource(input.getChildColumnView(nonFirstIndex)) { nonFirstColumnView =>
@@ -728,7 +642,7 @@ case class GpuCast(
// 3.1+: ", "
columns += sepColumn.incRefCount()
val nonFirstColumn = recursiveDoColumnar(nonFirstColumnView,
- inputSchema(nonFirstIndex).dataType)
+ inputSchema(nonFirstIndex).dataType, StringType, ansiMode, legacyCastToString)
if (legacyCastToString) {
// " " if non-null
columns += spaceColumn.mergeAndSetValidity(BinaryOp.BITWISE_AND, nonFirstColumnView)
@@ -744,18 +658,17 @@ case class GpuCast(
- withResource(
- Seq(emptyStr, nullStr, separatorStr, spaceStr, leftStr, rightStr)
- .safeMap(Scalar.fromString _)
- ) { case Seq(emptyScalar, nullScalar, columnScalars@_*) =>
+ withResource(Seq(emptyStr, nullStr, separatorStr, spaceStr, leftStr, rightStr)
+ .safeMap(Scalar.fromString)) {
+ case Seq(emptyScalar, nullScalar, columnScalars@_*) =>
- withResource(
- columnScalars.safeMap(s => ColumnVector.fromScalar(s, numRows))
- ) { case Seq(sepColumn, spaceColumn, leftColumn, rightColumn) =>
+ withResource(
+ columnScalars.safeMap(s => ColumnVector.fromScalar(s, numRows))
+ ) { case Seq(sepColumn, spaceColumn, leftColumn, rightColumn) =>
- doCastStructToString(emptyScalar, nullScalar, sepColumn,
- spaceColumn, leftColumn, rightColumn)
- }
+ doCastStructToString(emptyScalar, nullScalar, sepColumn,
+ spaceColumn, leftColumn, rightColumn)
+ }
@@ -829,7 +742,7 @@ case class GpuCast(
withResource(sanitized.castTo(dType)) { parsedInt =>
- withResource(GpuScalar.from(null, dataType)) { nullVal =>
+ withResource(Scalar.fromNull(dType)) { nullVal =>
isInt.ifElse(parsedInt, nullVal)
@@ -1102,7 +1015,7 @@ case class GpuCast(
- private def castStringToTimestamp(input: ColumnVector): ColumnVector = {
+ private def castStringToTimestamp(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
// special timestamps
val today = DateUtils.currentDate()
@@ -1191,8 +1104,87 @@ case class GpuCast(
- private def castIntegralsToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = {
+ private def castMapToMap(
+ from: MapType,
+ to: MapType,
+ input: ColumnView,
+ ansiMode: Boolean,
+ legacyCastToString: Boolean): ColumnVector = {
+ // For cudf a map is a list of (key, value) structs, but lets keep it in ColumnView as much
+ // as possible
+ withResource(input.getChildColumnView(0)) { kvStructColumn =>
+ val castKey = withResource(kvStructColumn.getChildColumnView(0)) { keyColumn =>
+ recursiveDoColumnar(keyColumn, from.keyType, to.keyType, ansiMode, legacyCastToString)
+ }
+ withResource(castKey) { castKey =>
+ val castValue = withResource(kvStructColumn.getChildColumnView(1)) { valueColumn =>
+ recursiveDoColumnar(valueColumn, from.valueType, to.valueType,
+ ansiMode, legacyCastToString)
+ }
+ withResource(castValue) { castValue =>
+ withResource(ColumnView.makeStructView(castKey, castValue)) { castKvStructColumn =>
+ // We don't have to worry about null in the key/value struct because they are not
+ // allowed for maps in Spark
+ withResource(input.replaceListChild(castKvStructColumn)) { replacedView =>
+ replacedView.copyToColumnVector()
+ }
+ }
+ }
+ }
+ }
+ }
+ private def castStructToStruct(
+ from: StructType,
+ to: StructType,
+ input: ColumnView,
+ ansiMode: Boolean,
+ legacyCastToString: Boolean): ColumnVector = {
+ withResource(new ArrayBuffer[ColumnVector](from.length)) { childColumns =>
+ from.indices.foreach { index =>
+ childColumns += recursiveDoColumnar(
+ input.getChildColumnView(index),
+ from(index).dataType,
+ to(index).dataType,
+ ansiMode,
+ legacyCastToString)
+ }
+ withResource(ColumnView.makeStructView(childColumns: _*)) { casted =>
+ if (input.getNullCount == 0) {
+ casted.copyToColumnVector()
+ } else {
+ withResource(input.isNull) { isNull =>
+ withResource(GpuScalar.from(null, to)) { nullVal =>
+ isNull.ifElse(nullVal, casted)
+ }
+ }
+ }
+ }
+ }
+ }
+ private def castIntegralsToDecimalAfterCheck(input: ColumnView, dt: DecimalType): ColumnVector = {
+ if (dt.scale < 0) {
+ // Rounding is essential when scale is negative,
+ // so we apply HALF_UP rounding manually to keep align with CpuCast.
+ withResource(input.castTo(DecimalUtil.createCudfDecimal(dt.precision, 0))) {
+ scaleZero => scaleZero.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP)
+ }
+ } else if (dt.scale > 0) {
+ // Integer will be enlarged during casting if scale > 0, so we cast input to INT64
+ // before casting it to decimal in case of overflow.
+ withResource(input.castTo(DType.INT64)) { long =>
+ long.castTo(DecimalUtil.createCudfDecimal(dt.precision, dt.scale))
+ }
+ } else {
+ input.castTo(DecimalUtil.createCudfDecimal(dt.precision, dt.scale))
+ }
+ }
+ private def castIntegralsToDecimal(
+ input: ColumnView,
+ dt: DecimalType,
+ ansiMode: Boolean): ColumnVector = {
// Use INT64 bounds instead of FLOAT64 bounds, which enables precise comparison.
val (lowBound, upBound) = math.pow(10, dt.precision - dt.scale) match {
case bound if bound > Long.MaxValue => (Long.MinValue, Long.MaxValue)
@@ -1200,38 +1192,26 @@ case class GpuCast(
// At first, we conduct overflow check onto input column.
// Then, we cast checked input into target decimal type.
- val checkedInput = if (ansiMode) {
+ if (ansiMode) {
minValue = Scalar.fromLong(lowBound),
maxValue = Scalar.fromLong(upBound))
- input.incRefCount()
+ castIntegralsToDecimalAfterCheck(input, dt)
} else {
- replaceOutOfRangeValues(input,
+ val checkedInput = replaceOutOfRangeValues(input,
minValue = Scalar.fromLong(lowBound),
maxValue = Scalar.fromLong(upBound),
replaceValue = Scalar.fromNull(input.getType))
- }
- withResource(checkedInput) { checked =>
- if (dt.scale < 0) {
- // Rounding is essential when scale is negative,
- // so we apply HALF_UP rounding manually to keep align with CpuCast.
- withResource(checked.castTo(DecimalUtil.createCudfDecimal(dt.precision, 0))) {
- scaleZero => scaleZero.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP)
- }
- } else if (dt.scale > 0) {
- // Integer will be enlarged during casting if scale > 0, so we cast input to INT64
- // before casting it to decimal in case of overflow.
- withResource(checked.castTo(DType.INT64)) { long =>
- long.castTo(DecimalUtil.createCudfDecimal(dt.precision, dt.scale))
- }
- } else {
- checked.castTo(DecimalUtil.createCudfDecimal(dt.precision, dt.scale))
+ withResource(checkedInput) { checked =>
+ castIntegralsToDecimalAfterCheck(checked, dt)
- private def castFloatsToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = {
+ private def castFloatsToDecimal(
+ input: ColumnView,
+ dt: DecimalType,
+ ansiMode: Boolean): ColumnVector = {
// Approach to minimize difference between CPUCast and GPUCast:
// step 1. cast input to FLOAT64 (if necessary)
@@ -1269,7 +1249,7 @@ case class GpuCast(
val casted = if (DType.DECIMAL64_MAX_PRECISION == dt.scale) {
} else {
- val containerType = DecimalUtil.createCudfDecimal(dt.precision, (dt.scale + 1))
+ val containerType = DecimalUtil.createCudfDecimal(dt.precision, dt.scale + 1)
withResource(checked.castTo(containerType)) { container =>
container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP)
@@ -1285,18 +1265,21 @@ case class GpuCast(
- private def castDecimalToDecimal(input: ColumnVector,
+ private def castDecimalToDecimal(
+ input: ColumnView,
from: DecimalType,
- to: DecimalType): ColumnVector = {
+ to: DecimalType,
+ ansiMode: Boolean): ColumnVector = {
val isFrom32Bit = DecimalType.is32BitDecimalType(from)
val isTo32Bit = DecimalType.is32BitDecimalType(to)
val cudfDecimal = DecimalUtil.createCudfDecimal(to.precision, to.scale)
- def castCheckedDecimal(checkedInput: ColumnVector): ColumnVector = {
+ def castCheckedDecimal(checkedInput: ColumnView): ColumnVector = {
if (to.scale == from.scale) {
if (isFrom32Bit == isTo32Bit) {
- checkedInput.incRefCount()
+ // If the input is a ColumnVector already this will just inc the reference count
+ checkedInput.copyToColumnVector()
} else {
// the input is already checked, just cast it
@@ -1304,38 +1287,39 @@ case class GpuCast(
} else if (to.scale > from.scale) {
} else {
- withResource(checkedInput.round(to.scale, ai.rapids.cudf.RoundMode.HALF_UP)) {
- rounded => rounded.castTo(cudfDecimal)
- }
+ withResource(checkedInput.round(to.scale, ai.rapids.cudf.RoundMode.HALF_UP)) {
+ rounded => rounded.castTo(cudfDecimal)
+ }
if (to.scale <= from.scale) {
if (!isFrom32Bit && isTo32Bit) {
// check for overflow when 64bit => 32bit
- withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput =>
+ withResource(checkForOverflow(input, to, isFrom32Bit, ansiMode)) { checkedInput =>
} else {
if (to.scale < 0 && !SQLConf.get.allowNegativeScaleOfDecimalEnabled) {
throw new IllegalStateException(s"Negative scale is not allowed: ${to.scale}. " +
- s"You can use spark.sql.legacy.allowNegativeScaleOfDecimal=true " +
- s"to enable legacy mode to allow it.")
+ s"You can use spark.sql.legacy.allowNegativeScaleOfDecimal=true " +
+ s"to enable legacy mode to allow it.")
} else {
// from.scale > to.scale
- withResource(checkForOverflow(input, to, isFrom32Bit)) { checkedInput =>
+ withResource(checkForOverflow(input, to, isFrom32Bit, ansiMode)) { checkedInput =>
def checkForOverflow(
- input: ColumnVector,
- to: DecimalType,
- isFrom32Bit: Boolean): ColumnVector = {
+ input: ColumnView,
+ to: DecimalType,
+ isFrom32Bit: Boolean,
+ ansiMode: Boolean): ColumnVector = {
// Decimal numbers in general terms have two parts, a part before decimal (whole number)
// and a part after decimal (fractional number)
@@ -1377,7 +1361,7 @@ case class GpuCast(
// if (isFrom32Bit && prec > Decimal.MAX_INT_DIGITS ||
// !isFrom32Bit && prec > Decimal.MAX_LONG_DIGITS)
if (isFrom32Bit && absBoundPrecision > Decimal.MAX_INT_DIGITS) {
- return input.incRefCount()
+ return input.copyToColumnVector()
val (minValueScalar, maxValueScalar) = if (!isFrom32Bit) {
val absBound = math.pow(10, absBoundPrecision).toLong
@@ -1391,7 +1375,7 @@ case class GpuCast(
minValue = minValueScalar,
maxValue = maxValueScalar,
inclusiveMin = false, inclusiveMax = false)
- input.incRefCount()
+ input.copyToColumnVector()
} else {
minValue = minValueScalar,
@@ -1403,3 +1387,71 @@ case class GpuCast(
+ * Casts using the GPU
+ */
+case class GpuCast(
+ child: Expression,
+ dataType: DataType,
+ ansiMode: Boolean = false,
+ timeZoneId: Option[String] = None,
+ legacyCastToString: Boolean = false)
+ extends GpuUnaryExpression with TimeZoneAwareExpression with NullIntolerant {
+ import GpuCast._
+ override def toString: String = if (ansiMode) {
+ s"ansi_cast($child as ${dataType.simpleString})"
+ } else {
+ s"cast($child as ${dataType.simpleString})"
+ }
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (Cast.canCast(child.dataType, dataType)) {
+ TypeCheckResult.TypeCheckSuccess
+ } else {
+ TypeCheckResult.TypeCheckFailure(
+ s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}")
+ }
+ }
+ override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+ /**
+ * Under certain conditions during hash partitioning, Spark will attempt to replace casts
+ * with semantically equivalent expressions. This method is overridden to prevent Spark
+ * from substituting non-GPU expressions.
+ */
+ override def semanticEquals(other: Expression): Boolean = other match {
+ case g: GpuExpression =>
+ if (this == g) {
+ true
+ } else {
+ super.semanticEquals(g)
+ }
+ case _ => false
+ }
+ // When this cast involves TimeZone, it's only resolved if the timeZoneId is set;
+ // Otherwise behave like Expression.resolved.
+ override lazy val resolved: Boolean =
+ childrenResolved && checkInputDataTypes().isSuccess && (!needsTimeZone || timeZoneId.isDefined)
+ private[this] def needsTimeZone: Boolean = Cast.needsTimeZone(child.dataType, dataType)
+ override def sql: String = dataType match {
+ // HiveQL doesn't allow casting to complex types. For logical plans translated from HiveQL,
+ // this type of casting can only be introduced by the analyzer, and can be omitted when
+ // converting back to SQL query string.
+ case _: ArrayType | _: MapType | _: StructType => child.sql
+ case _ => s"CAST(${child.sql} AS ${dataType.sql})"
+ }
+ override def doColumnar(input: GpuColumnVector): ColumnVector =
+ recursiveDoColumnar(input.getBase, input.dataType(), dataType, ansiMode, legacyCastToString)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
index f74d0ae5ebf..fe5d03ada0c 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
@@ -1046,41 +1046,50 @@ class CastChecks extends ExprChecks {
val sparkNullSig: TypeSig = all
val booleanChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + STRING
- val sparkBooleanSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + STRING
+ val sparkBooleanSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + STRING
val integralChecks: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + STRING + BINARY
- val sparkIntegralSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + STRING + BINARY
+ val sparkIntegralSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + STRING + BINARY
val fpChecks: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + STRING
- val sparkFpSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + STRING
+ val sparkFpSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + STRING
val dateChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING
- val sparkDateSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + DATE + STRING
+ val sparkDateSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING
val timestampChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING
- val sparkTimestampSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + DATE + STRING
+ val sparkTimestampSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING
val stringChecks: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY
- val sparkStringSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + DATE + CALENDAR + STRING + BINARY
+ val sparkStringSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + CALENDAR + STRING + BINARY
val binaryChecks: TypeSig = none
val sparkBinarySig: TypeSig = STRING + BINARY
val decimalChecks: TypeSig = DECIMAL_64 + STRING
- val sparkDecimalSig: TypeSig = gpuNumeric + BOOLEAN + TIMESTAMP + STRING
+ val sparkDecimalSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + STRING
val calendarChecks: TypeSig = none
val sparkCalendarSig: TypeSig = CALENDAR + STRING
- val arrayChecks: TypeSig = ARRAY.nested(FLOAT + DOUBLE + INT + ARRAY)
+ val arrayChecks: TypeSig = ARRAY.nested(commonCudfTypes + DECIMAL_64 + NULL +
+ psNote(TypeEnum.ARRAY, "The array's child type must also support being cast to " +
+ "the desired child type")
val sparkArraySig: TypeSig = STRING + ARRAY.nested(all)
- val mapChecks: TypeSig = none
+ val mapChecks: TypeSig = MAP.nested(commonCudfTypes + DECIMAL_64 + NULL + ARRAY + BINARY +
+ psNote(TypeEnum.MAP, "the map's key and value must also support being cast to the " +
+ "desired child types")
val sparkMapSig: TypeSig = STRING + MAP.nested(all)
val structChecks: TypeSig = psNote(TypeEnum.STRING, "the struct's children must also support " +
- "being cast to string")
+ "being cast to string") +
+ STRUCT.nested(commonCudfTypes + DECIMAL_64 + NULL + ARRAY + BINARY + STRUCT + MAP) +
+ psNote(TypeEnum.STRUCT, "the struct's children must also support being cast to the " +
+ "desired child type(s)")
val sparkStructSig: TypeSig = STRING + STRUCT.nested(all)
val udtChecks: TypeSig = none