From 16515f766d68465fba7753fedd0bbb1fea9060f9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 20 Nov 2023 08:13:32 -0700 Subject: [PATCH] Add support for parsing strings as dates in `from_json` [databricks] (#9666) * improve tests * remove debug * update docs, test for nulls Signed-off-by: Andy Grove * add link to issue * Revert newline * Improve error message * update docs * Add shims * add tests with leading and trailing whitespace * handle more whitespace edge cases * handle more whitespace edge cases * xfail instead of skip, fix invalid regex * Use GpuJsonUtils.dateFormatInRead * More use of parsed JSONOptions * bug fix and test for fallback for unsupported dateFormat * update shims * fix 330 shim * Fallback for LEGACY timeParserPolicy and add fallback tests * fix 350 build * fix build on 321db * fix build on 321db --------- Signed-off-by: Andy Grove --- docs/compatibility.md | 10 ++- docs/supported_ops.md | 4 +- .../src/main/python/json_test.py | 78 +++++++++++++++++++ .../com/nvidia/spark/rapids/GpuCast.scala | 2 +- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../catalyst/json/rapids/GpuJsonScan.scala | 22 +++++- .../spark/sql/rapids/GpuJsonToStructs.scala | 13 +++- .../rapids/shims/GpuJsonToStructsShim.scala | 60 ++++++++++++++ .../sql/catalyst/json/GpuJsonUtils.scala | 20 +++++ .../sql/catalyst/json/GpuJsonUtils.scala | 18 +++++ .../sql/catalyst/json/GpuJsonUtils.scala | 17 ++++ .../rapids/shims/GpuJsonToStructsShim.scala | 49 ++++++++++++ .../sql/catalyst/json/GpuJsonUtils.scala | 21 +++++ 13 files changed, 307 insertions(+), 11 deletions(-) create mode 100644 sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala create mode 100644 sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala diff --git a/docs/compatibility.md b/docs/compatibility.md index ac90d309fe1..370d61e5b0c 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -322,9 +322,15 @@ This particular function supports to output a map or struct type with limited fu The `from_json` function is disabled by default because it is experimental and has some known incompatibilities with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStructs=true`. -There are several known issues: +Dates are partially supported but there are some known issues: -Dates and timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)). +- Only the default `dateFormat` of `yyyy-MM-dd` is supported. The query will fall back to CPU if any other format + is specified ([#9667](https://github.com/NVIDIA/spark-rapids/issues/9667)) +- Strings containing integers with more than four digits will be + parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4 + will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown. + +Timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)). When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)). diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 601c9db995b..490ec771ab0 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -8141,8 +8141,8 @@ are limited. NS -PS
MAP only supports keys and values that are of STRING type;
unsupported child types DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
-PS
unsupported child types DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
+PS
MAP only supports keys and values that are of STRING type;
unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
+PS
unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 5b7cee85440..e3f50727619 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -523,6 +523,84 @@ def test_from_json_struct_decimal(): .select(f.from_json('a', 'struct')), conf={"spark.rapids.sql.expression.JsonToStructs": True}) +@pytest.mark.parametrize('date_gen', [ + # "yyyy-MM-dd" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"", + # "yyyy-MM" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"", + # "yyyy" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[0-9]{4}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"", + # "dd/MM/yyyy" + "\"[0-9]{2}/[0-9]{2}/[1-8]{1}[0-9]{3}\"", + # special constant values + "\"(now|today|tomorrow|epoch)\"", + # "nnnnn" (number of days since epoch prior to Spark 3.4, throws exception from 3.4) + pytest.param("\"[0-9]{5}\"", marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/9664")), + # integral + "[0-9]{1,5}", + # floating-point + "[0-9]{0,2}\\.[0-9]{1,2}" + # boolean + "(true|false)" +]) +@pytest.mark.parametrize('date_format', [ + "", + "yyyy-MM-dd", + # https://github.com/NVIDIA/spark-rapids/issues/9667 + pytest.param("dd/MM/yyyy", marks=pytest.mark.allow_non_gpu('ProjectExec')), +]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param("LEGACY", marks=pytest.mark.allow_non_gpu('ProjectExec')), + "CORRECTED" +]) +def test_from_json_struct_date(date_gen, date_format, time_parser_policy): + json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \ + .with_special_case('{ "a": null }') \ + .with_special_case('null') + options = { 'dateFormat': date_format } if len(date_format) > 0 else { } + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.col('a'), f.from_json('a', 'struct', options)), + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy}) + +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('date_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}\""]) +@pytest.mark.parametrize('date_format', [ + "", + "yyyy-MM-dd", +]) +def test_from_json_struct_date_fallback_legacy(date_gen, date_format): + json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \ + .with_special_case('{ "a": null }') \ + .with_special_case('null') + options = { 'dateFormat': date_format } if len(date_format) > 0 else { } + assert_gpu_fallback_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.col('a'), f.from_json('a', 'struct', options)), + 'ProjectExec', + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.sql.legacy.timeParserPolicy': 'LEGACY'}) + +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('date_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}\""]) +@pytest.mark.parametrize('date_format', [ + "dd/MM/yyyy", + "yyyy/MM/dd", +]) +def test_from_json_struct_date_fallback_non_default_format(date_gen, date_format): + json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \ + .with_special_case('{ "a": null }') \ + .with_special_case('null') + options = { 'dateFormat': date_format } if len(date_format) > 0 else { } + assert_gpu_fallback_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.col('a'), f.from_json('a', 'struct', options)), + 'ProjectExec', + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}) + + @pytest.mark.parametrize('schema', ['struct', 'struct>', 'struct>']) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 6634c946d47..2d1ba0d3c3b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1347,7 +1347,7 @@ object GpuCast { * `yyyy-[m]m-[d]d *` * `yyyy-[m]m-[d]dT*` */ - private def castStringToDate(sanitizedInput: ColumnVector): ColumnVector = { + def castStringToDate(sanitizedInput: ColumnVector): ColumnVector = { // convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd val converted = convertDateOr(sanitizedInput, DATE_REGEX_YYYY_MM_DD, "%Y-%m-%d", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 28af8a163b2..bdeae65a975 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3570,7 +3570,7 @@ object GpuOverrides extends Logging { "Returns a struct value with the given `jsonStr` and `schema`", ExprChecks.projectOnly( TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral + - TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN) + + TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE) + TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP, "MAP only supports keys and values that are of STRING type"), (TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all), @@ -3584,7 +3584,7 @@ object GpuOverrides extends Logging { willNotWorkOnGpu("from_json on GPU only supports MapType " + "or StructType schema") } - GpuJsonScan.tagJsonToStructsSupport(a.options, this) + GpuJsonScan.tagJsonToStructsSupport(a.options, a.dataType, this) } override def convertToGpu(child: Expression): GpuExpression = diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 3d738db4d72..5c730bc23bf 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -25,7 +25,7 @@ import ai.rapids.cudf import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, NvtxColor, RegexProgram, Scalar, Schema, Table} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, ShimFilePartitionReaderFactory} +import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory} import org.apache.hadoop.conf.Configuration import org.apache.spark.broadcast.Broadcast @@ -40,7 +40,8 @@ import org.apache.spark.sql.execution.datasources.{PartitionedFile, Partitioning import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan} import org.apache.spark.sql.execution.datasources.v2.json.JsonScan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType} +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.types.{DataType, DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -103,12 +104,27 @@ object GpuJsonScan { } def tagJsonToStructsSupport(options:Map[String, String], - meta: RapidsMeta[_, _, _]): Unit = { + dt: DataType, + meta: RapidsMeta[_, _, _]): Unit = { val parsedOptions = new JSONOptionsInRead( options, SQLConf.get.sessionLocalTimeZone, SQLConf.get.columnNameOfCorruptRecord) + val hasDates = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[DateType]) + if (hasDates) { + GpuJsonUtils.optionalDateFormatInRead(parsedOptions) match { + case None | Some("yyyy-MM-dd") => + // this is fine + case dateFormat => + meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat") + } + } + + if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) { + meta.willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs") + } + tagSupportOptions(parsedOptions, meta) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index 85a6cdabfb0..f2de53483b0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuCast.doCast import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq import com.nvidia.spark.rapids.jni.MapUtils +import com.nvidia.spark.rapids.shims.GpuJsonToStructsShim import org.apache.commons.text.StringEscapeUtils import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression} @@ -210,6 +211,10 @@ case class GpuJsonToStructs( (sparkType, dtype) match { case (DataTypes.StringType, DataTypes.BooleanType) => castJsonStringToBool(col) + case (DataTypes.StringType, DataTypes.DateType) => + GpuJsonToStructsShim.castJsonStringToDate(col, options) + case (_, DataTypes.DateType) => + castToNullDate(input.getBase) case _ => doCast(col, sparkType, dtype) } @@ -235,7 +240,7 @@ case class GpuJsonToStructs( private def castJsonStringToBool(input: ColumnVector): ColumnVector = { val isTrue = withResource(Scalar.fromString("true")) { trueStr => - input.equalTo(trueStr) + input.equalTo(trueStr) } withResource(isTrue) { _ => val isFalse = withResource(Scalar.fromString("false")) { falseStr => @@ -256,6 +261,12 @@ case class GpuJsonToStructs( } } + private def castToNullDate(input: ColumnVector): ColumnVector = { + withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { nullScalar => + ColumnVector.fromScalar(nullScalar, input.getRowCount.toInt) + } + } + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = copy(timeZoneId = Option(timeZoneId)) diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala new file mode 100644 index 00000000000..7e6709388f3 --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import ai.rapids.cudf.{ColumnVector, Scalar} +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuCast + +import org.apache.spark.sql.catalyst.json.GpuJsonUtils + +object GpuJsonToStructsShim { + + def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = { + GpuJsonUtils.dateFormatInRead(options) match { + case "yyyy-MM-dd" => + withResource(Scalar.fromString(" ")) { space => + withResource(input.strip(space)) { trimmed => + GpuCast.castStringToDate(trimmed) + } + } + case other => + // should be unreachable due to GpuOverrides checks + throw new IllegalStateException(s"Unsupported dateFormat $other") + } + } + +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index afab73a07b5..6a3c63ca2e9 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -27,8 +27,28 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.catalyst.json +import org.apache.spark.sql.internal.SQLConf + object GpuJsonUtils { + + def optionalDateFormatInRead(options: JSONOptions): Option[String] = + Some(options.dateFormat) + + def optionalDateFormatInRead(options: Map[String, String]): Option[String] = + optionalDateFormatInRead(parseJSONReadOptions(options)) + def dateFormatInRead(options: JSONOptions): String = options.dateFormat + + def dateFormatInRead(options: Map[String, String]): String = + dateFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false + + def parseJSONReadOptions(options: Map[String, String]) = { + new JSONOptionsInRead( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } } diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index a4539d9b026..ab673be12f5 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -23,9 +23,19 @@ import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.internal.SQLConf object GpuJsonUtils { + + def optionalDateFormatInRead(options: Map[String, String]): Option[String] = { + optionalDateFormatInRead(parseJSONReadOptions(options)) + } + def optionalDateFormatInRead(options: JSONOptions): Option[String] = + options.dateFormatInRead + def dateFormatInRead(options: JSONOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + def dateFormatInRead(options: Map[String, String]): String = + dateFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" @@ -34,4 +44,12 @@ object GpuJsonUtils { }) def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false + + def parseJSONReadOptions(options: Map[String, String]) = { + new JSONOptionsInRead( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } + } diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 9defeb992f8..9a1e34bfe12 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -30,9 +30,19 @@ import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.internal.SQLConf object GpuJsonUtils { + + def optionalDateFormatInRead(options: Map[String, String]): Option[String] = + optionalDateFormatInRead(parseJSONReadOptions(options)) + + def optionalDateFormatInRead(options: JSONOptions): Option[String] = + options.dateFormatInRead + def dateFormatInRead(options: JSONOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + def dateFormatInRead(options: Map[String, String]): String = + dateFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" @@ -41,4 +51,11 @@ object GpuJsonUtils { }) def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false + + def parseJSONReadOptions(options: Map[String, String]) = { + new JSONOptionsInRead( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } } diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala new file mode 100644 index 00000000000..88560143f2e --- /dev/null +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "340"} +{"spark": "341"} +{"spark": "341db"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import ai.rapids.cudf.{ColumnVector, Scalar} +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuCast + +import org.apache.spark.sql.catalyst.json.GpuJsonUtils + +object GpuJsonToStructsShim { + + def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = { + GpuJsonUtils.optionalDateFormatInRead(options) match { + case None => + // legacy behavior + withResource(Scalar.fromString(" ")) { space => + withResource(input.strip(space)) { trimmed => + GpuCast.castStringToDate(trimmed) + } + } + case Some("yyyy-MM-dd") => + GpuCast.convertDateOrNull(input, "^[0-9]{4}-[0-9]{2}-[0-9]{2}$", "%Y-%m-%d") + case other => + // should be unreachable due to GpuOverrides checks + throw new IllegalStateException(s"Unsupported dateFormat $other") + } + } + +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index dcb39c5c636..92c1c17bba5 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -25,8 +25,29 @@ package org.apache.spark.sql.catalyst.json import com.nvidia.spark.rapids.shims.LegacyBehaviorPolicyShim import org.apache.spark.sql.catalyst.util.DateFormatter +import org.apache.spark.sql.internal.SQLConf object GpuJsonUtils { + + def optionalDateFormatInRead(options: JSONOptions): Option[String] = + options.dateFormatInRead + + def optionalDateFormatInRead(options: Map[String, String]): Option[String] = { + val parsedOptions = new JSONOptionsInRead( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + optionalDateFormatInRead(parsedOptions) + } + + /** + * Return the dateFormat that Spark will use in JSONOptions, or return a default. Note that this + * does not match Spark's behavior in all cases, because dateFormat is intentionally optional. + * + * For example, in `org.apache.spark.sql.catalyst.util.DateFormatter#getFormatter`, a legacy + * formatter will be used if no dateFormat is specified, and it does not correspond to + * `DateFormatter.defaultPattern`. + */ def dateFormatInRead(options: JSONOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)