diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 9e2e98006ab..dcab93e6f41 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -26,40 +26,58 @@ non_utc_tz_allow = ['ProjectExec'] if not is_utc() else [] # Others work in all supported time zones non_supported_tz_allow = ['ProjectExec'] if not is_supported_time_zone() else [] +non_supported_tz_allow_filter = ['ProjectExec', 'FilterExec'] if not is_supported_time_zone() else [] # We only support literal intervals for TimeSub vals = [(-584, 1563), (1943, 1101), (2693, 2167), (2729, 0), (44, 1534), (2635, 3319), - (1885, -2828), (0, 2463), (932, 2286), (0, 0)] + (1885, -2828), (0, 2463), (932, 2286), (0, 0), (0, 86400), (1, 86401), (1, 8640000)] @pytest.mark.parametrize('data_gen', vals, ids=idfn) -@allow_non_gpu(*non_utc_allow) +@allow_non_gpu(*non_supported_tz_allow) def test_timesub(data_gen): days, seconds = data_gen assert_gpu_and_cpu_are_equal_collect( - # We are starting at year 0015 to make sure we don't go before year 0001 while doing TimeSub - lambda spark: unary_op_df(spark, TimestampGen(start=datetime(15, 1, 1, tzinfo=timezone.utc)), seed=1) + lambda spark: unary_op_df(spark, TimestampGen()) .selectExpr("a - (interval {} days {} seconds)".format(days, seconds))) @pytest.mark.parametrize('data_gen', vals, ids=idfn) -@allow_non_gpu(*non_utc_allow) +@allow_non_gpu(*non_supported_tz_allow) def test_timeadd(data_gen): days, seconds = data_gen assert_gpu_and_cpu_are_equal_collect( - # We are starting at year 0005 to make sure we don't go before year 0001 - # and beyond year 10000 while doing TimeAdd - lambda spark: unary_op_df(spark, TimestampGen(start=datetime(5, 1, 1, tzinfo=timezone.utc), end=datetime(15, 1, 1, tzinfo=timezone.utc)), seed=1) + lambda spark: unary_op_df(spark, TimestampGen()) .selectExpr("a + (interval {} days {} seconds)".format(days, seconds))) +@pytest.mark.parametrize('edge_vals', [-pow(2, 63), pow(2, 63)], ids=idfn) +@allow_non_gpu(*non_supported_tz_allow) +def test_timeadd_long_overflow(edge_vals): + assert_gpu_and_cpu_error( + lambda spark: unary_op_df(spark, TimestampGen()) + .selectExpr("a + (interval {} microseconds)".format(edge_vals)), + conf={}, + error_message='long overflow') + @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') -@allow_non_gpu(*non_utc_allow) +@allow_non_gpu(*non_supported_tz_allow) def test_timeadd_daytime_column(): gen_list = [ # timestamp column max year is 1000 - ('t', TimestampGen(end=datetime(1000, 1, 1, tzinfo=timezone.utc))), + ('t', TimestampGen(end=datetime(2000, 1, 1, tzinfo=timezone.utc))), # max days is 8000 year, so added result will not be out of range - ('d', DayTimeIntervalGen(min_value=timedelta(days=0), max_value=timedelta(days=8000 * 365)))] + ('d', DayTimeIntervalGen(min_value=timedelta(days=-1000 * 365), max_value=timedelta(days=8000 * 365)))] assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, gen_list).selectExpr("t + d", "t + INTERVAL '1 02:03:04' DAY TO SECOND")) +@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') +@allow_non_gpu(*non_supported_tz_allow) +def test_timeadd_daytime_column_long_overflow(): + overflow_gen = SetValuesGen(DayTimeIntervalType(), + [timedelta(microseconds=-pow(2, 63)), timedelta(microseconds=(pow(2, 63) - 1))]) + gen_list = [('t', TimestampGen()),('d', overflow_gen)] + assert_gpu_and_cpu_error( + lambda spark : gen_df(spark, gen_list).selectExpr("t + d").collect(), + conf={}, + error_message='long overflow') + @pytest.mark.skipif(is_before_spark_350(), reason='DayTimeInterval overflow check for seconds is not supported before Spark 3.5.0') def test_interval_seconds_overflow_exception(): assert_gpu_and_cpu_error( @@ -68,7 +86,7 @@ def test_interval_seconds_overflow_exception(): error_message="IllegalArgumentException") @pytest.mark.parametrize('data_gen', vals, ids=idfn) -@allow_non_gpu(*non_utc_allow) +@allow_non_gpu(*non_supported_tz_allow_filter) def test_timeadd_from_subquery(data_gen): def fun(spark): @@ -80,7 +98,7 @@ def fun(spark): assert_gpu_and_cpu_are_equal_collect(fun) @pytest.mark.parametrize('data_gen', vals, ids=idfn) -@allow_non_gpu(*non_utc_allow) +@allow_non_gpu(*non_supported_tz_allow) def test_timesub_from_subquery(data_gen): def fun(spark): @@ -135,19 +153,17 @@ def test_datediff(data_gen): 'datediff(a, date(null))', 'datediff(a, \'2016-03-02\')')) -hms_fallback = ['ProjectExec'] if not is_supported_time_zone() else [] - -@allow_non_gpu(*hms_fallback) +@allow_non_gpu(*non_supported_tz_allow) def test_hour(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('hour(a)')) -@allow_non_gpu(*hms_fallback) +@allow_non_gpu(*non_supported_tz_allow) def test_minute(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('minute(a)')) -@allow_non_gpu(*hms_fallback) +@allow_non_gpu(*non_supported_tz_allow) def test_second(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('second(a)')) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b330eb5b52d..9266d39f1d6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1646,6 +1646,8 @@ object GpuOverrides extends Logging { .withPsNote(TypeEnum.CALENDAR, "month intervals are not supported"), TypeSig.CALENDAR)), (timeAdd, conf, p, r) => new BinaryExprMeta[TimeAdd](timeAdd, conf, p, r) { + override def isTimeZoneSupported = true + override def tagExprForGpu(): Unit = { GpuOverrides.extractLit(timeAdd.interval).foreach { lit => val intvl = lit.value.asInstanceOf[CalendarInterval] @@ -1656,7 +1658,7 @@ object GpuOverrides extends Logging { } override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuTimeAdd(lhs, rhs) + GpuTimeAdd(lhs, rhs, timeAdd.timeZoneId) }), expr[DateAddInterval]( "Adds interval to date", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala index 1bcdc58c612..f4f18491745 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala @@ -473,7 +473,7 @@ object GpuScalar extends Logging { * * This class is introduced because many expressions require both the cudf Scalar and its * corresponding Scala value to complete their computations. e.g. 'GpuStringSplit', - * 'GpuStringLocate', 'GpuDivide', 'GpuDateAddInterval', 'GpuTimeMath' ... + * 'GpuStringLocate', 'GpuDivide', 'GpuDateAddInterval', 'GpuTimeAdd' ... * So only either a cudf Scalar or a Scala value can not support such cases, unless copying data * between the host and the device each time being asked for. * @@ -493,7 +493,7 @@ object GpuScalar extends Logging { * happens. * * Another reason why storing the Scala value in addition to the cudf Scalar is - * `GpuDateAddInterval` and 'GpuTimeMath' have different algorithms with the 3 members of + * `GpuDateAddInterval` and 'GpuTimeAdd' have different algorithms with the 3 members of * a `CalendarInterval`, which can not be supported by a single cudf Scalar now. * * Do not create a GpuScalar from the constructor, instead call the factory APIs above. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 3e96fa7d419..2954046aa55 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,8 @@ object AddOverflowChecks { def basicOpOverflowCheck( lhs: BinaryOperable, rhs: BinaryOperable, - ret: ColumnVector): Unit = { + ret: ColumnVector, + msg: String = "One or more rows overflow for Add operation."): Unit = { // Check overflow. It is true if the arguments have different signs and // the sign of the result is different from the sign of x. // Which is equal to "((x ^ r) & (y ^ r)) < 0" in the form of arithmetic. @@ -54,9 +55,7 @@ object AddOverflowChecks { withResource(signDiffCV) { signDiff => withResource(signDiff.any()) { any => if (any.isValid && any.getBoolean) { - throw RapidsErrorUtils.arithmeticOverflowError( - "One or more rows overflow for Add operation." - ) + throw RapidsErrorUtils.arithmeticOverflowError(msg) } } } @@ -114,7 +113,8 @@ object SubtractOverflowChecks { def basicOpOverflowCheck( lhs: BinaryOperable, rhs: BinaryOperable, - ret: ColumnVector): Unit = { + ret: ColumnVector, + msg: String = "One or more rows overflow for Add operation."): Unit = { // Check overflow. It is true if the arguments have different signs and // the sign of the result is different from the sign of x. // Which is equal to "((x ^ y) & (x ^ r)) < 0" in the form of arithmetic. @@ -131,8 +131,7 @@ object SubtractOverflowChecks { withResource(signDiffCV) { signDiff => withResource(signDiff.any()) { any => if (any.isValid && any.getBoolean) { - throw RapidsErrorUtils. - arithmeticOverflowError("One or more rows overflow for Subtract operation.") + throw RapidsErrorUtils.arithmeticOverflowError(msg) } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 3169d6bc543..3e920363413 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -140,78 +140,33 @@ case class GpuYear(child: Expression) extends GpuDateUnaryExpression { input.getBase.year() } -abstract class GpuTimeMath( - start: Expression, +case class GpuDateAddInterval(start: Expression, interval: Expression, - timeZoneId: Option[String] = None) - extends ShimBinaryExpression - with GpuExpression - with TimeZoneAwareExpression - with ExpectsInputTypes - with Serializable { + timeZoneId: Option[String] = None, + ansiEnabled: Boolean = SQLConf.get.ansiEnabled) + extends ShimBinaryExpression + with GpuExpression + with TimeZoneAwareExpression + with ExpectsInputTypes + with Serializable { def this(start: Expression, interval: Expression) = this(start, interval, None) override def left: Expression = start override def right: Expression = interval - override def toString: String = s"$left - $right" - override def sql: String = s"${left.sql} - ${right.sql}" - override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, CalendarIntervalType) - - override def dataType: DataType = TimestampType + override def toString: String = s"$left + $right" + override def sql: String = s"${left.sql} + ${right.sql}" override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess val microSecondsInOneDay: Long = TimeUnit.DAYS.toMicros(1) - override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { - withResourceIfAllowed(left.columnarEval(batch)) { lhs => - withResourceIfAllowed(right.columnarEvalAny(batch)) { rhs => - (lhs, rhs) match { - case (l, intvlS: GpuScalar) - if intvlS.dataType.isInstanceOf[CalendarIntervalType] => - // Scalar does not support 'CalendarInterval' now, so use - // the Scala value instead. - // Skip the null check because it wll be detected by the following calls. - val intvl = intvlS.getValue.asInstanceOf[CalendarInterval] - if (intvl.months != 0) { - throw new UnsupportedOperationException("Months aren't supported at the moment") - } - val usToSub = intvl.days * microSecondsInOneDay + intvl.microseconds - if (usToSub != 0) { - withResource(Scalar.fromLong(usToSub)) { us_s => - withResource(l.getBase.bitCastTo(DType.INT64)) { us => - withResource(intervalMath(us_s, us)) { longResult => - GpuColumnVector.from(longResult.castTo(DType.TIMESTAMP_MICROSECONDS), dataType) - } - } - } - } else { - l.incRefCount() - } - case _ => - throw new UnsupportedOperationException("only column and interval arguments " + - s"are supported, got left: ${lhs.getClass} right: ${rhs.getClass}") - } - } - } - } - - def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector -} - -case class GpuDateAddInterval(start: Expression, - interval: Expression, - timeZoneId: Option[String] = None, - ansiEnabled: Boolean = SQLConf.get.ansiEnabled) - extends GpuTimeMath(start, interval, timeZoneId) { - override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) } - override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { + def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { us.add(us_s) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressionsUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressionsUtils.scala new file mode 100644 index 00000000000..e34e1cf346c --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressionsUtils.scala @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids + +import java.time.ZoneId +import java.util.concurrent.TimeUnit + +import ai.rapids.cudf.{BinaryOp, BinaryOperable, ColumnVector, ColumnView, DType, Scalar} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.GpuOverrides.isUTCTimezone +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB + +object datetimeExpressionsUtils { + + val microSecondsInOneDay: Long = TimeUnit.DAYS.toMicros(1) + + def timestampAddDurationUs(cv: ColumnVector, duration: BinaryOperable, + zoneId: ZoneId): ColumnVector = { + assert(cv.getType == DType.TIMESTAMP_MICROSECONDS, + "cv should be TIMESTAMP_MICROSECONDS type but got " + cv.getType) + assert(duration.getType == DType.DURATION_MICROSECONDS, + "duration should be DURATION_MICROSECONDS type but got " + duration.getType) + val resWithOverflow = if (isUTCTimezone(zoneId)) { + // Not use cv.add(duration), because of it invoke BinaryOperable.implicitConversion, + // and currently BinaryOperable.implicitConversion return Long + // Directly specify the return type is TIMESTAMP_MICROSECONDS + duration match { + case durS: Scalar => { + cv.binaryOp(BinaryOp.ADD, durS, DType.TIMESTAMP_MICROSECONDS) + } + case durC: ColumnView => { + cv.binaryOp(BinaryOp.ADD, durC, DType.TIMESTAMP_MICROSECONDS) + } + } + } else { + duration match { + case durS: Scalar => GpuTimeZoneDB.timeAdd(cv, durS, zoneId) + case durC: ColumnView => GpuTimeZoneDB.timeAdd(cv, durC, zoneId) + } + } + closeOnExcept(resWithOverflow) { _ => + timeAddOverflowCheck(cv, duration, resWithOverflow) + } + resWithOverflow + } + + def timestampAddDurationCalendar(cv: ColumnVector, days: Long, + microseconds: Long, zoneId: ZoneId): ColumnVector = { + assert(cv.getType == DType.TIMESTAMP_MICROSECONDS, + "cv should be TIMESTAMP_MICROSECONDS type but got " + cv.getType) + val interval = days * microSecondsInOneDay + microseconds + if (interval == 0) { + return cv.incRefCount() + } + val resWithOverflow = if (isUTCTimezone(zoneId)) { + cv.binaryOp(BinaryOp.ADD, Scalar.durationFromLong(DType.DURATION_MICROSECONDS, + interval), DType.TIMESTAMP_MICROSECONDS) + } else { + // For CalendarInterval, microseconds could be larger than 1 day or negative, + // and microseconds in TimeAdd is not affected by timezone, so we need to + // calculate days and microseconds separately. + val daysScalar = Scalar.durationFromLong(DType.DURATION_MICROSECONDS, + days * microSecondsInOneDay) + val resDays = withResource(daysScalar) { _ => + GpuTimeZoneDB.timeAdd(cv, daysScalar, zoneId) + } + withResource(resDays) { _ => + resDays.binaryOp(BinaryOp.ADD, Scalar.durationFromLong(DType.DURATION_MICROSECONDS, + microseconds), DType.TIMESTAMP_MICROSECONDS) + } + } + // The sign of duration will be unchanged considering the impact of timezone + closeOnExcept(resWithOverflow) { _ => + withResource(Scalar.durationFromLong(DType.DURATION_MICROSECONDS, + interval)) { duration => + timeAddOverflowCheck(cv, duration, resWithOverflow) + } + } + resWithOverflow + } + + def timeAddOverflowCheck( + cv: ColumnVector, + duration: BinaryOperable, + resWithOverflow: ColumnVector): Unit = { + withResource(resWithOverflow.castTo(DType.INT64)) { resWithOverflowLong => + withResource(cv.bitCastTo(DType.INT64)) { cvLong => + duration match { + case dur: Scalar => + val durLong = Scalar.fromLong(dur.getLong) + withResource(durLong) { _ => + AddOverflowChecks.basicOpOverflowCheck( + cvLong, durLong, resWithOverflowLong, "long overflow") + } + case dur: ColumnView => + withResource(dur.bitCastTo(DType.INT64)) { durationLong => + AddOverflowChecks.basicOpOverflowCheck( + cvLong, durationLong, resWithOverflowLong, "long overflow") + } + case _ => + throw new UnsupportedOperationException("only scalar and column arguments " + + s"are supported, got ${duration.getClass}") + } + } + } + } +} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala index 8a37bd63ca5..405ada9b61e 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +21,65 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.shims -import ai.rapids.cudf.{ColumnVector, ColumnView, Scalar} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar} +import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.shims.ShimBinaryExpression -import org.apache.spark.sql.catalyst.expressions.{Expression, TimeZoneAwareExpression} -import org.apache.spark.sql.rapids.GpuTimeMath +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, TimeZoneAwareExpression} +import org.apache.spark.sql.rapids.datetimeExpressionsUtils +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.unsafe.types.CalendarInterval -case class GpuTimeAdd(start: Expression, - interval: Expression, - timeZoneId: Option[String] = None) - extends GpuTimeMath(start, interval, timeZoneId) { +case class GpuTimeAdd( + start: Expression, + interval: Expression, + timeZoneId: Option[String] = None) + extends ShimBinaryExpression + with GpuExpression + with TimeZoneAwareExpression + with ExpectsInputTypes + with Serializable { + + def this(start: Expression, interval: Expression) = this(start, interval, None) + + override def left: Expression = start + override def right: Expression = interval + + override def toString: String = s"$left + $right" + override def sql: String = s"${left.sql} + ${right.sql}" + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, CalendarIntervalType) + + override def dataType: DataType = TimestampType + + override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) } - override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { - us.add(us_s) + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResource(left.columnarEval(batch)) { lhs => + withResourceIfAllowed(right.columnarEvalAny(batch)) { rhs => + // lhs is start, rhs is interval + (lhs, rhs) match { + case (l, intvlS: GpuScalar) if intvlS.dataType.isInstanceOf[CalendarIntervalType] => + // Scalar does not support 'CalendarInterval' now, so use + // the Scala value instead. + // Skip the null check because it wll be detected by the following calls. + val intvl = intvlS.getValue.asInstanceOf[CalendarInterval] + if (intvl.months != 0) { + throw new UnsupportedOperationException("Months aren't supported at the moment") + } + val resCv = datetimeExpressionsUtils.timestampAddDurationCalendar(l.getBase, + intvl.days, intvl.microseconds, zoneId) + GpuColumnVector.from(resCv, dataType) + case _ => + throw new UnsupportedOperationException("only column and interval arguments " + + s"are supported, got left: ${lhs.getClass} right: ${rhs.getClass}") + } + } + } } } diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala index 072efe003d2..af901d48b6a 100644 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -211,6 +211,9 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { ("interval", TypeSig.lit(TypeEnum.DAYTIME) + TypeSig.lit(TypeEnum.CALENDAR), TypeSig.DAYTIME + TypeSig.CALENDAR)), (timeAdd, conf, p, r) => new BinaryExprMeta[TimeAdd](timeAdd, conf, p, r) { + + override def isTimeZoneSupported = true + override def tagExprForGpu(): Unit = { GpuOverrides.extractLit(timeAdd.interval).foreach { lit => lit.dataType match { @@ -225,7 +228,7 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { } override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuTimeAdd(lhs, rhs) + GpuTimeAdd(lhs, rhs, timeAdd.timeZoneId) }), GpuOverrides.expr[SpecifiedWindowFrame]( "Specification of the width of the group (or \"frame\") of input rows " + diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala index fdb45beeb74..1e78a638a07 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/datetimeExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,15 +40,17 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.shims +import java.time.ZoneId import java.util.concurrent.TimeUnit -import ai.rapids.cudf.{BinaryOp, BinaryOperable, ColumnVector, ColumnView, DType, Scalar} +import ai.rapids.cudf.{DType, Scalar} import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar} import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.ShimBinaryExpression import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, TimeZoneAwareExpression} +import org.apache.spark.sql.rapids.datetimeExpressionsUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.CalendarInterval @@ -87,8 +89,7 @@ case class GpuTimeAdd(start: Expression, // lhs is start, rhs is interval (lhs, rhs) match { case (l, intervalS: GpuScalar) => - // get long type interval - val interval = intervalS.dataType match { + intervalS.dataType match { case CalendarIntervalType => // Scalar does not support 'CalendarInterval' now, so use // the Scala value instead. @@ -97,30 +98,36 @@ case class GpuTimeAdd(start: Expression, if (calendarI.months != 0) { throw new UnsupportedOperationException("Months aren't supported at the moment") } - calendarI.days * microSecondsInOneDay + calendarI.microseconds + val resCv = datetimeExpressionsUtils.timestampAddDurationCalendar(l.getBase, + calendarI.days, calendarI.microseconds, zoneId) + GpuColumnVector.from(resCv, dataType) case _: DayTimeIntervalType => - intervalS.getValue.asInstanceOf[Long] + val interval = intervalS.getValue.asInstanceOf[Long] + // add interval + if (interval != 0) { + val zoneId = ZoneId.of(timeZoneId.getOrElse("UTC")) + val resCv = withResource(Scalar.durationFromLong( + DType.DURATION_MICROSECONDS, interval)) { duration => + datetimeExpressionsUtils.timestampAddDurationUs(l.getBase, duration, zoneId) + } + GpuColumnVector.from(resCv, dataType) + } else { + l.incRefCount() + } case _ => throw new UnsupportedOperationException( "GpuTimeAdd unsupported data type: " + intervalS.dataType) } - - // add interval - if (interval != 0) { - withResource(Scalar.durationFromLong(DType.DURATION_MICROSECONDS, interval)) { d => - GpuColumnVector.from(timestampAddDuration(l.getBase, d), dataType) - } - } else { - l.incRefCount() - } case (l, r: GpuColumnVector) => (l.dataType(), r.dataType) match { case (_: TimestampType, _: DayTimeIntervalType) => // DayTimeIntervalType is stored as long // bitCastTo is similar to reinterpret_cast, it's fast, the time can be ignored. - withResource(r.getBase.bitCastTo(DType.DURATION_MICROSECONDS)) { duration => - GpuColumnVector.from(timestampAddDuration(l.getBase, duration), dataType) + val zoneId = ZoneId.of(timeZoneId.getOrElse("UTC")) + val resCv = withResource(r.getBase.bitCastTo(DType.DURATION_MICROSECONDS)) { dur => + datetimeExpressionsUtils.timestampAddDurationUs(l.getBase, dur, zoneId) } + GpuColumnVector.from(resCv, dataType) case _ => throw new UnsupportedOperationException( "GpuTimeAdd takes column and interval as an argument only") @@ -133,11 +140,4 @@ case class GpuTimeAdd(start: Expression, } } } - - private def timestampAddDuration(cv: ColumnView, duration: BinaryOperable): ColumnVector = { - // Not use cv.add(duration), because of it invoke BinaryOperable.implicitConversion, - // and currently BinaryOperable.implicitConversion return Long - // Directly specify the return type is TIMESTAMP_MICROSECONDS - cv.binaryOp(BinaryOp.ADD, duration, DType.TIMESTAMP_MICROSECONDS) - } } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DayTimeIntervalShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DayTimeIntervalShims.scala index 5d4bece1cba..8b02ea24c67 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DayTimeIntervalShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/DayTimeIntervalShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,9 @@ object DayTimeIntervalShims { .withPsNote(TypeEnum.CALENDAR, "month intervals are not supported"), TypeSig.DAYTIME + TypeSig.CALENDAR)), (timeAdd, conf, p, r) => new BinaryExprMeta[TimeAdd](timeAdd, conf, p, r) { + + override def isTimeZoneSupported = true + override def tagExprForGpu(): Unit = { GpuOverrides.extractLit(timeAdd.interval).foreach { lit => lit.dataType match { @@ -67,7 +70,7 @@ object DayTimeIntervalShims { } override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuTimeAdd(lhs, rhs) + GpuTimeAdd(lhs, rhs, timeAdd.timeZoneId) }), GpuOverrides.expr[Abs]( "Absolute value", diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala index 288bb38653f..dab577534ca 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala @@ -182,6 +182,31 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl runAndRecordTime("from_utc_timestamp", perfTest) } + test("test timeadd") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).selectExpr( + "count(c_ts - (interval -584 days 1563 seconds))", + "count(c_ts - (interval 1943 days 1101 seconds))", + "count(c_ts - (interval 2693 days 2167 seconds))", + "count(c_ts - (interval 2729 days 0 seconds))", + "count(c_ts - (interval 44 days 1534 seconds))", + "count(c_ts - (interval 2635 days 3319 seconds))", + "count(c_ts - (interval 1885 days -2828 seconds))", + "count(c_ts - (interval 0 days 2463 seconds))", + "count(c_ts - (interval 932 days 2286 seconds))", + "count(c_ts - (interval 0 days 0 seconds))" + ) + } + + runAndRecordTime("time_add", perfTest) + } + test("test to_utc_timestamp") { assume(enablePerfTest)