From b58dc2a4ecdd78f81a3560799cd89f42ae667ff2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Nov 2020 09:17:15 -0700 Subject: [PATCH] Support unix_timestamp on GPU for subset of formats (#1113) * Support unix_timestamp on GPU for subset of formats Signed-off-by: Andy Grove * close scalar value Signed-off-by: Andy Grove * compatible formats will now run on GPU without requiring incompatibleOps to be set Signed-off-by: Andy Grove * code cleanup and address more review comments Signed-off-by: Andy Grove * add specific config option for enabling incompatible date formats on GPU * update documentation Signed-off-by: Andy Grove * improve docs Signed-off-by: Andy Grove * use constants for special dates Signed-off-by: Andy Grove * Add support for more date formats and remove incompat from to_unix_timestamp Signed-off-by: Andy Grove * remove debug print Signed-off-by: Andy Grove * Revert unnecessary change Signed-off-by: Andy Grove * Make ToUnixTimestamp consistent with UnixTimestamp Signed-off-by: Andy Grove * refactor to remove duplicate code Signed-off-by: Andy Grove * fix resource leaks and fix regressions in python tests Signed-off-by: Andy Grove * scalstyle Signed-off-by: Andy Grove * update docs Signed-off-by: Andy Grove * fix error in handling of legacyTimeParserPolicy=EXCEPTION Signed-off-by: Andy Grove * fix test failures against Spark 3.1.0 Signed-off-by: Andy Grove --- docs/compatibility.md | 23 ++ docs/configs.md | 5 +- .../src/main/python/date_time_test.py | 10 +- .../spark/rapids/TimeOperatorsSuite.scala | 4 +- .../com/nvidia/spark/rapids/DateUtils.scala | 6 + .../com/nvidia/spark/rapids/GpuCast.scala | 41 ++-- .../nvidia/spark/rapids/GpuOverrides.scala | 27 ++- .../com/nvidia/spark/rapids/RapidsConf.scala | 9 + .../sql/rapids/datetimeExpressions.scala | 199 ++++++++++++++++-- .../spark/rapids/ParseDateTimeSuite.scala | 169 +++++++++++++++ 10 files changed, 433 insertions(+), 60 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala diff --git a/docs/compatibility.md b/docs/compatibility.md index 718be38b14e9..347dce36e163 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -212,6 +212,29 @@ window functions like `row_number`, `lead`, and `lag` can produce different resu includes both `-0.0` and `0.0`, or if the ordering is ambiguous. Spark can produce different results from one run to another if the ordering is ambiguous on a window function too. +## Parsing strings as dates or timestamps + +When converting strings to dates or timestamps using functions like `to_date` and `unix_timestamp`, +only a subset of possible formats are supported on GPU with full compatibility with Spark. The +supported formats are: + +- `dd/MM/yyyy` +- `yyyy/MM` +- `yyyy/MM/dd` +- `yyyy-MM` +- `yyyy-MM-dd` +- `yyyy-MM-dd HH:mm:ss` + +Other formats may result in incorrect results and will not run on the GPU by default. Some +specific issues with other formats are: + +- Spark supports partial microseconds but the plugin does not +- The plugin will produce incorrect results for input data that is not in the correct format in +some cases + +To enable all formats on GPU, set +[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled) to `true`. + ## Casting between types In general, performing `cast` and `ansi_cast` operations on the GPU is compatible with the same operations on the CPU. However, there are some exceptions. For this reason, certain casts are disabled on the GPU by default and require configuration options to be specified to enable them. diff --git a/docs/configs.md b/docs/configs.md index c678daa46b0a..37636958d674 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -71,6 +71,7 @@ Name | Description | Default Value spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false +spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, setting this to true will force all parsing onto GPU even for formats that can result in incorrect results when parsing invalid inputs.|false spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647 @@ -219,12 +220,12 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.TimeSub| |Subtracts interval from timestamp|true|None| spark.rapids.sql.expression.ToDegrees|`degrees`|Converts radians to degrees|true|None| spark.rapids.sql.expression.ToRadians|`radians`|Converts degrees to radians|true|None| -spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null| +spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|true|None| spark.rapids.sql.expression.UnaryMinus|`negative`|Negate a numeric value|true|None| spark.rapids.sql.expression.UnaryPositive|`positive`|A numeric value with a + in front of it|true|None| spark.rapids.sql.expression.UnboundedFollowing$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None| spark.rapids.sql.expression.UnboundedPreceding$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None| -spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null| +spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|true|None| spark.rapids.sql.expression.Upper|`upper`, `ucase`|String uppercase operator|false|This is not 100% compatible with the Spark version because in some cases unicode characters change byte width when changing the case. The GPU string conversion does not support these characters. For a full list of unsupported characters see https://github.com/rapidsai/cudf/issues/3132| spark.rapids.sql.expression.WeekDay|`weekday`|Returns the day of the week (0 = Monday...6=Sunday)|true|None| spark.rapids.sql.expression.WindowExpression| |Calculates a return value for every input row of a table based on a group (or "window") of rows|true|None| diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 80ecc92a0c7a..a7e7405844d6 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -160,26 +160,23 @@ def test_dayofyear(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.dayofyear(f.col('a')))) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_unix_timestamp(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a')))) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)")) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", + "spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp_improved(data_gen): conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} @@ -190,14 +187,11 @@ def test_to_unix_timestamp_improved(data_gen): (StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'), (ConvertGen(DateGen(nullable=False), lambda d: d.strftime('%Y/%m').zfill(7), data_type=StringType()), 'yyyy/MM')] -@incompat @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): - print("date: " + date_form) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form))) -@incompat @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_unix_timestamp(data_gen, date_form): assert_gpu_and_cpu_are_equal_collect( diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala index 949a78945ad2..09958a2c6b43 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids +import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ class TimeOperatorsSuite extends SparkQueryCompareTestSuite { @@ -28,7 +29,8 @@ class TimeOperatorsSuite extends SparkQueryCompareTestSuite { } testSparkResultsAreEqual( - "Test from_unixtime with alternative month and two digit year", datesPostEpochDf) { + "Test from_unixtime with alternative month and two digit year", datesPostEpochDf, + conf = new SparkConf().set(RapidsConf.INCOMPATIBLE_DATE_FORMATS.key, "true")) { frame => frame.select(from_unixtime(col("dates"),"dd/LL/yy HH:mm:ss.SSSSSS")) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index a879011580a0..a452dce42b77 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -36,6 +36,12 @@ object DateUtils { "MM" -> "%m", "LL" -> "%m", "dd" -> "%d", "mm" -> "%M", "ss" -> "%S", "HH" -> "%H", "yy" -> "%y", "yyyy" -> "%Y", "SSSSSS" -> "%f") + val ONE_SECOND_MICROSECONDS = 1000000 + + val ONE_DAY_SECONDS = 86400L + + val ONE_DAY_MICROSECONDS = 86400000000L + case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int) /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 119b0147ded9..c1fd0dd382b4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -96,8 +96,6 @@ object GpuCast { "\\A\\d{4}\\-\\d{2}\\-\\d{2}[ T]\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z\\Z" private val TIMESTAMP_REGEX_NO_DATE = "\\A[T]?(\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z)\\Z" - private val ONE_DAY_MICROSECONDS = 86400000000L - /** * Regex for identifying strings that contain numeric values that can be casted to integral * types. This includes floating point numbers but not numbers containing exponents. @@ -122,6 +120,12 @@ object GpuCast { val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number" + val EPOCH = "epoch" + val NOW = "now" + val TODAY = "today" + val YESTERDAY = "yesterday" + val TOMORROW = "tomorrow" + /** * Returns true iff we can cast `from` to `to` using the GPU. */ @@ -182,6 +186,17 @@ object GpuCast { case _ => false } } + + def calculateSpecialDates: Map[String, Int] = { + val now = DateTimeUtils.currentDate(ZoneId.of("UTC")) + Map( + EPOCH -> 0, + NOW -> now, + TODAY -> now, + YESTERDAY -> (now - 1), + TOMORROW -> (now + 1) + ) + } } /** @@ -655,16 +670,6 @@ case class GpuCast( } } - // special dates - val now = DateTimeUtils.currentDate(ZoneId.of("UTC")) - val specialDates: Map[String, Int] = Map( - "epoch" -> 0, - "now" -> now, - "today" -> now, - "yesterday" -> (now - 1), - "tomorrow" -> (now + 1) - ) - var sanitizedInput = input.incRefCount() // replace partial months @@ -677,6 +682,8 @@ case class GpuCast( cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1") } + val specialDates = calculateSpecialDates + withResource(sanitizedInput) { sanitizedInput => // convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd @@ -756,11 +763,11 @@ case class GpuCast( val today: Long = cal.getTimeInMillis * 1000 val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime) val specialDates: Map[String, Long] = Map( - "epoch" -> 0, - "now" -> today, - "today" -> today, - "yesterday" -> (today - ONE_DAY_MICROSECONDS), - "tomorrow" -> (today + ONE_DAY_MICROSECONDS) + GpuCast.EPOCH -> 0, + GpuCast.NOW -> today, + GpuCast.TODAY -> today, + GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS), + GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS) ) var sanitizedInput = input.incRefCount() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index f806bca71dbb..edc67b6ef2d1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.execution.window.WindowExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids._ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand import org.apache.spark.sql.rapids.execution.{GpuBroadcastMeta, GpuBroadcastNestedLoopJoinMeta, GpuCustomShuffleReaderExec, GpuShuffleMeta} @@ -1142,28 +1143,24 @@ object GpuOverrides { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = { if (conf.isImprovedTimestampOpsEnabled) { // passing the already converted strf string for a little optimization - GpuToUnixTimestampImproved(lhs, rhs, strfFormat) + GpuToUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat) } else { - GpuToUnixTimestamp(lhs, rhs, strfFormat) + GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat) } } - }) - .incompat("Incorrectly formatted strings and bogus dates produce garbage data" + - " instead of null"), + }), expr[UnixTimestamp]( "Returns the UNIX timestamp of current or specified time", (a, conf, p, r) => new UnixTimeExprMeta[UnixTimestamp](a, conf, p, r){ override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = { if (conf.isImprovedTimestampOpsEnabled) { // passing the already converted strf string for a little optimization - GpuUnixTimestampImproved(lhs, rhs, strfFormat) + GpuUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat) } else { - GpuUnixTimestamp(lhs, rhs, strfFormat) + GpuUnixTimestamp(lhs, rhs, sparkFormat, strfFormat) } } - }) - .incompat("Incorrectly formatted strings and bogus dates produce garbage data" + - " instead of null"), + }), expr[Hour]( "Returns the hour component of the string/timestamp", (a, conf, p, r) => new UnaryExprMeta[Hour](a, conf, p, r) { @@ -2094,6 +2091,16 @@ object GpuOverrides { ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = commonExecs ++ ShimLoader.getSparkShims.getExecs + + def getTimeParserPolicy: TimeParserPolicy = { + val policy = SQLConf.get.getConfString(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "EXCEPTION") + policy match { + case "LEGACY" => LegacyTimeParserPolicy + case "EXCEPTION" => ExceptionTimeParserPolicy + case "CORRECTED" => CorrectedTimeParserPolicy + } + } + } /** Tag the initial plan when AQE is enabled */ case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 27161b08284b..e89796c5c0ec 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -427,6 +427,13 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val INCOMPATIBLE_DATE_FORMATS = conf("spark.rapids.sql.incompatibleDateFormats.enabled") + .doc("When parsing strings as dates and timestamps in functions like unix_timestamp, " + + "setting this to true will force all parsing onto GPU even for formats that can " + + "result in incorrect results when parsing invalid inputs.") + .booleanConf + .createWithDefault(false) + val IMPROVED_FLOAT_OPS = conf("spark.rapids.sql.improvedFloatOps.enabled") .doc("For some floating point operations spark uses one way to compute the value " + "and the underlying cudf implementation can use an improved algorithm. " + @@ -922,6 +929,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS) + lazy val incompatDateFormats: Boolean = get(INCOMPATIBLE_DATE_FORMATS) + lazy val includeImprovedFloat: Boolean = get(IMPROVED_FLOAT_OPS) lazy val pinnedPoolSize: Long = get(PINNED_POOL_SIZE) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 3c48dfcd2e46..0db90cca7af7 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.rapids import java.time.ZoneId import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} -import com.nvidia.spark.rapids.{BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException -import com.nvidia.spark.rapids.GpuOverrides.extractStringLit +import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import org.apache.spark.{SPARK_VERSION, SparkUpgradeException} import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -285,6 +286,7 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi (expr: A, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], rule: ConfKeysAndIncompat) extends BinaryExprMeta[A](expr, conf, parent, rule) { + var sparkFormat: String = _ var strfFormat: String = _ override def tagExprForGpu(): Unit = { if (ZoneId.of(expr.timeZoneId.get).normalized() != GpuOverrides.UTC_TIMEZONE_ID) { @@ -293,11 +295,20 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi // Date and Timestamp work too if (expr.right.dataType == StringType) { try { - val rightLit = extractStringLit(expr.right) - if (rightLit.isDefined) { - strfFormat = DateUtils.toStrf(rightLit.get) - } else { - willNotWorkOnGpu("format has to be a string literal") + extractStringLit(expr.right) match { + case Some(rightLit) => + sparkFormat = rightLit + if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { + willNotWorkOnGpu("legacyTimeParserPolicy LEGACY is not supported") + } else if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) || + conf.incompatDateFormats) { + strfFormat = DateUtils.toStrf(sparkFormat) + } else { + willNotWorkOnGpu(s"incompatible format '$sparkFormat'. Set " + + s"spark.rapids.sql.incompatibleDateFormats.enabled=true to force onto GPU.") + } + case None => + willNotWorkOnGpu("format has to be a string literal") } } catch { case x: TimestampFormatConversionException => @@ -307,6 +318,125 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi } } +sealed trait TimeParserPolicy extends Serializable +object LegacyTimeParserPolicy extends TimeParserPolicy +object ExceptionTimeParserPolicy extends TimeParserPolicy +object CorrectedTimeParserPolicy extends TimeParserPolicy + +object GpuToTimestamp extends Arm { + /** We are compatible with Spark for these formats */ + val COMPATIBLE_FORMATS = Seq( + "yyyy-MM-dd", + "yyyy-MM", + "yyyy/MM/dd", + "yyyy/MM", + "dd/MM/yyyy", + "yyyy-MM-dd HH:mm:ss" + ) + + val specialDatesSeconds = GpuCast.calculateSpecialDates + .map { + case (name, days) => (name, days * DateUtils.ONE_DAY_SECONDS) + } + + val specialDatesMicros = GpuCast.calculateSpecialDates + .map { + case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS) + } + + def daysScalarSeconds(name: String): Scalar = { + Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSeconds(name)) + } + + def daysScalarMicros(name: String): Scalar = { + Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDatesMicros(name)) + } + + def daysEqual(col: ColumnVector, name: String): ColumnVector = { + withResource(Scalar.fromString(name)) { scalarName => + col.equalTo(scalarName) + } + } + + def isTimestamp(col: ColumnVector, sparkFormat: String, strfFormat: String) : ColumnVector = { + if (COMPATIBLE_FORMATS.contains(sparkFormat)) { + // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime + // and ToUnixTime and will support parsing a subset of a string so we check the length of + // the string as well which works well for fixed-length formats but if/when we want to + // support variable-length formats (such as timestamps with milliseconds) then we will need + // to use regex instead. + withResource(col.getCharLengths) { actualLen => + withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => + withResource(actualLen.equalTo(expectedLen)) { lengthOk => + withResource(col.isTimestamp(strfFormat)) { isTimestamp => + isTimestamp.and(lengthOk) + } + } + } + } + } else { + // this is the incompatibleDateFormats case where we do not guarantee compatibility with + // Spark and assume that all non-null inputs are valid + ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt) + } + } + + def parseStringAsTimestamp( + lhs: GpuColumnVector, + sparkFormat: String, + strfFormat: String, + timeParserPolicy: TimeParserPolicy, + dtype: DType, + daysScalar: String => Scalar, + asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = { + + val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat) + + // in addition to date/timestamp strings, we also need to check for special dates and null + // values, since anything else is invalid and should throw an error or be converted to null + // depending on the policy + withResource(isTimestamp) { isTimestamp => + withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => + withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => + withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => + withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => + withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => + withResource(lhs.getBase.isNull) { isNull => + withResource(Scalar.fromNull(dtype)) { nullValue => + withResource(asTimestamp(lhs.getBase, strfFormat)) { converted => + withResource(daysScalar(GpuCast.EPOCH)) { epoch => + withResource(daysScalar(GpuCast.NOW)) { now => + withResource(daysScalar(GpuCast.TODAY)) { today => + withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday => + withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow => + withResource(isTomorrow.ifElse(tomorrow, nullValue)) { a => + withResource(isYesterday.ifElse(yesterday, a)) { b => + withResource(isToday.ifElse(today, b)) { c => + withResource(isNow.ifElse(now, c)) { d => + withResource(isEpoch.ifElse(epoch, d)) { e => + isTimestamp.ifElse(converted, e) + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } +} + /** * A direct conversion of Spark's ToTimestamp class which converts time to UNIX timestamp by * first converting to microseconds and then dividing by the downScaleFactor @@ -314,8 +444,11 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi abstract class GpuToTimestamp extends GpuBinaryExpression with TimeZoneAwareExpression with ExpectsInputTypes { - def downScaleFactor = 1000000 // MICROS IN SECOND + import GpuToTimestamp._ + + def downScaleFactor = DateUtils.ONE_SECOND_MICROSECONDS + def sparkFormat: String def strfFormat: String override def inputTypes: Seq[AbstractDataType] = @@ -326,6 +459,8 @@ abstract class GpuToTimestamp override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess + val timeParserPolicy = getTimeParserPolicy + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { throw new IllegalArgumentException("rhs has to be a scalar for the unixtimestamp to work") } @@ -338,7 +473,14 @@ abstract class GpuToTimestamp override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - lhs.getBase.asTimestampMicroseconds(strfFormat) + parseStringAsTimestamp( + lhs, + sparkFormat, + strfFormat, + timeParserPolicy, + DType.TIMESTAMP_MICROSECONDS, + daysScalarMicros, + (col, strfFormat) => col.asTimestampMicroseconds(strfFormat)) } else { // Timestamp or DateType lhs.getBase.asTimestampMicroseconds() } @@ -364,10 +506,19 @@ abstract class GpuToTimestamp * first converting to microseconds */ abstract class GpuToTimestampImproved extends GpuToTimestamp { + import GpuToTimestamp._ + override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - lhs.getBase.asTimestampSeconds(strfFormat) + parseStringAsTimestamp( + lhs, + sparkFormat, + strfFormat, + timeParserPolicy, + DType.TIMESTAMP_SECONDS, + daysScalarSeconds, + (col, strfFormat) => col.asTimestampSeconds(strfFormat)) } else if (lhs.dataType() == DateType){ lhs.getBase.asTimestampSeconds() } else { // Timestamp @@ -397,9 +548,10 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp { } case class GpuUnixTimestamp(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestamp { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestamp { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) @@ -411,9 +563,10 @@ case class GpuUnixTimestamp(strTs: Expression, } case class GpuToUnixTimestamp(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestamp { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestamp { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) @@ -425,9 +578,10 @@ case class GpuToUnixTimestamp(strTs: Expression, } case class GpuUnixTimestampImproved(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestampImproved { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestampImproved { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) @@ -439,9 +593,10 @@ case class GpuUnixTimestampImproved(strTs: Expression, } case class GpuToUnixTimestampImproved(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestampImproved { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestampImproved { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala new file mode 100644 index 000000000000..558270de2908 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.{col, to_date, unix_timestamp} +import org.apache.spark.sql.internal.SQLConf + +class ParseDateTimeSuite extends SparkQueryCompareTestSuite { + + val execsAllowedNonGpu = ShimLoader.getSparkShims.getSparkShimVersion match { + case SparkShimVersion(3, 1, _) => + // The behavior has changed in Spark 3.1.0 and `to_date` gets translated to + // `cast(gettimestamp(c0#20108, yyyy-MM-dd, Some(UTC)) as date)` and we do + // not currently support `gettimestamp` on GPU + // https://github.com/NVIDIA/spark-rapids/issues/1157 + Seq("ProjectExec,Alias,Cast,GetTimestamp,Literal") + case _ => + Seq.empty + } + + testSparkResultsAreEqual("to_date yyyy-MM-dd", + datesAsStrings, + conf = new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED"), + execsAllowedNonGpu = execsAllowedNonGpu) { + df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd")) + } + + testSparkResultsAreEqual("to_date dd/MM/yyyy", + datesAsStrings, + conf = new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED"), + execsAllowedNonGpu = execsAllowedNonGpu) { + df => df.withColumn("c1", to_date(col("c0"), "dd/MM/yyyy")) + } + + testSparkResultsAreEqual("to_date default pattern", + datesAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", to_date(col("c0"))) + } + + testSparkResultsAreEqual("unix_timestamp parse date", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd")) + } + + testSparkResultsAreEqual("unix_timestamp parse yyyy/MM", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy/MM")) + } + + testSparkResultsAreEqual("to_unix_timestamp parse yyyy/MM", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => { + df.createOrReplaceTempView("df") + df.sqlContext.sql("SELECT c0, to_unix_timestamp(c0, 'yyyy/MM') FROM df") + } + } + + testSparkResultsAreEqual("to_unix_timestamp parse yyyy/MM (improvedTimeOps)", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED") + .set(RapidsConf.IMPROVED_TIMESTAMP_OPS.key, "true")) { + df => { + df.createOrReplaceTempView("df") + df.sqlContext.sql("SELECT c0, to_unix_timestamp(c0, 'yyyy/MM') FROM df") + } + } + + testSparkResultsAreEqual("unix_timestamp parse timestamp", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) + } + + testSparkResultsAreEqual("unix_timestamp parse timestamp millis (fall back to CPU)", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, "ProjectExec,Alias,UnixTimestamp,Literal")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss.SSS")) + } + + testSparkResultsAreEqual("unix_timestamp parse timestamp default pattern", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"))) + } + + test("fall back to CPU when policy is LEGACY") { + val e = intercept[IllegalArgumentException] { + val df = withGpuSparkSession(spark => { + timestampsAsStrings(spark) + .repartition(2) + .withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) + }, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "LEGACY")) + df.collect() + } + assert(e.getMessage.contains( + "Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec")) + } + + private def timestampsAsStrings(spark: SparkSession) = { + import spark.implicits._ + timestampValues.toDF("c0") + } + + private def datesAsStrings(spark: SparkSession) = { + import spark.implicits._ + val values = Seq( + GpuCast.EPOCH, + GpuCast.NOW, + GpuCast.TODAY, + GpuCast.YESTERDAY, + GpuCast.TOMORROW + ) ++ timestampValues + values.toDF("c0") + } + + private val timestampValues = Seq( + "", + "null", + null, + "\n", + "1999-12-31 ", + "1999-12-31 11", + "1999-12-31 11:", + "1999-12-31 11:5", + "1999-12-31 11:59", + "1999-12-31 11:59:", + "1999-12-31 11:59:5", + "1999-12-31 11:59:59", + "1999-12-31 11:59:59.", + "1999-12-31 11:59:59.9", + "1999-12-31 11:59:59.99", + "1999-12-31 11:59:59.999", + "31/12/1999", + "31/12/1999 11:59:59.999", + "1999-12-31", + "1999/12/31", + "1999-12", + "1999/12", + "1975/06", + "1975/06/18", + "1975/06/18 06:48:57", + "1999-12-31\n", + "\t1999-12-31", + "\n1999-12-31", + "1999/12/31" + ) +} +