Skip to content

Commit

Permalink
Add support for parsing strings as dates in from_json [databricks] (#…
Browse files Browse the repository at this point in the history
…9666)

* improve tests

* remove debug

* update docs, test for nulls

Signed-off-by: Andy Grove <[email protected]>

* add link to issue

* Revert newline

* Improve error message

* update docs

* Add shims

* add tests with leading and trailing whitespace

* handle more whitespace edge cases

* handle more whitespace edge cases

* xfail instead of skip, fix invalid regex

* Use GpuJsonUtils.dateFormatInRead

* More use of parsed JSONOptions

* bug fix and test for fallback for unsupported dateFormat

* update shims

* fix 330 shim

* Fallback for LEGACY timeParserPolicy and add fallback tests

* fix 350 build

* fix build on 321db

* fix build on 321db

---------

Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove authored Nov 20, 2023
1 parent 9ed98c8 commit 16515f7
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 11 deletions.
10 changes: 8 additions & 2 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,15 @@ This particular function supports to output a map or struct type with limited fu
The `from_json` function is disabled by default because it is experimental and has some known incompatibilities
with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStructs=true`.

There are several known issues:
Dates are partially supported but there are some known issues:

Dates and timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).
- Only the default `dateFormat` of `yyyy-MM-dd` is supported. The query will fall back to CPU if any other format
is specified ([#9667](https://github.com/NVIDIA/spark-rapids/issues/9667))
- Strings containing integers with more than four digits will be
parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4
will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown.

Timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).

When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting
for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)).
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8141,8 +8141,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
78 changes: 78 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,84 @@ def test_from_json_struct_decimal():
.select(f.from_json('a', 'struct<a:decimal>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('date_gen', [
# "yyyy-MM-dd"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "yyyy-MM"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "yyyy"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[0-9]{4}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "dd/MM/yyyy"
"\"[0-9]{2}/[0-9]{2}/[1-8]{1}[0-9]{3}\"",
# special constant values
"\"(now|today|tomorrow|epoch)\"",
# "nnnnn" (number of days since epoch prior to Spark 3.4, throws exception from 3.4)
pytest.param("\"[0-9]{5}\"", marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/9664")),
# integral
"[0-9]{1,5}",
# floating-point
"[0-9]{0,2}\\.[0-9]{1,2}"
# boolean
"(true|false)"
])
@pytest.mark.parametrize('date_format', [
"",
"yyyy-MM-dd",
# https://github.com/NVIDIA/spark-rapids/issues/9667
pytest.param("dd/MM/yyyy", marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param("LEGACY", marks=pytest.mark.allow_non_gpu('ProjectExec')),
"CORRECTED"
])
def test_from_json_struct_date(date_gen, date_format, time_parser_policy):
json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'dateFormat': date_format } if len(date_format) > 0 else { }
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:date>', options)),
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('date_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}\""])
@pytest.mark.parametrize('date_format', [
"",
"yyyy-MM-dd",
])
def test_from_json_struct_date_fallback_legacy(date_gen, date_format):
json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'dateFormat': date_format } if len(date_format) > 0 else { }
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:date>', options)),
'ProjectExec',
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'LEGACY'})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('date_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}\""])
@pytest.mark.parametrize('date_format', [
"dd/MM/yyyy",
"yyyy/MM/dd",
])
def test_from_json_struct_date_fallback_non_default_format(date_gen, date_format):
json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'dateFormat': date_format } if len(date_format) > 0 else { }
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:date>', options)),
'ProjectExec',
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})


@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
'struct<teacher:string,student:struct<name:string,age:int>>'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ object GpuCast {
* `yyyy-[m]m-[d]d *`
* `yyyy-[m]m-[d]dT*`
*/
private def castStringToDate(sanitizedInput: ColumnVector): ColumnVector = {
def castStringToDate(sanitizedInput: ColumnVector): ColumnVector = {

// convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd
val converted = convertDateOr(sanitizedInput, DATE_REGEX_YYYY_MM_DD, "%Y-%m-%d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3570,7 +3570,7 @@ object GpuOverrides extends Logging {
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN) +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE) +
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Expand All @@ -3584,7 +3584,7 @@ object GpuOverrides extends Logging {
willNotWorkOnGpu("from_json on GPU only supports MapType<StringType, StringType> " +
"or StructType schema")
}
GpuJsonScan.tagJsonToStructsSupport(a.options, this)
GpuJsonScan.tagJsonToStructsSupport(a.options, a.dataType, this)
}

override def convertToGpu(child: Expression): GpuExpression =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ai.rapids.cudf
import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, NvtxColor, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, ShimFilePartitionReaderFactory}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
Expand All @@ -40,7 +40,8 @@ import org.apache.spark.sql.execution.datasources.{PartitionedFile, Partitioning
import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -103,12 +104,27 @@ object GpuJsonScan {
}

def tagJsonToStructsSupport(options:Map[String, String],
meta: RapidsMeta[_, _, _]): Unit = {
dt: DataType,
meta: RapidsMeta[_, _, _]): Unit = {
val parsedOptions = new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)

val hasDates = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[DateType])
if (hasDates) {
GpuJsonUtils.optionalDateFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd") =>
// this is fine
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
}

if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) {
meta.willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs")
}

tagSupportOptions(parsedOptions, meta)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.jni.MapUtils
import com.nvidia.spark.rapids.shims.GpuJsonToStructsShim
import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
Expand Down Expand Up @@ -210,6 +211,10 @@ case class GpuJsonToStructs(
(sparkType, dtype) match {
case (DataTypes.StringType, DataTypes.BooleanType) =>
castJsonStringToBool(col)
case (DataTypes.StringType, DataTypes.DateType) =>
GpuJsonToStructsShim.castJsonStringToDate(col, options)
case (_, DataTypes.DateType) =>
castToNullDate(input.getBase)
case _ => doCast(col, sparkType, dtype)
}

Expand All @@ -235,7 +240,7 @@ case class GpuJsonToStructs(

private def castJsonStringToBool(input: ColumnVector): ColumnVector = {
val isTrue = withResource(Scalar.fromString("true")) { trueStr =>
input.equalTo(trueStr)
input.equalTo(trueStr)
}
withResource(isTrue) { _ =>
val isFalse = withResource(Scalar.fromString("false")) { falseStr =>
Expand All @@ -256,6 +261,12 @@ case class GpuJsonToStructs(
}
}

private def castToNullDate(input: ColumnVector): ColumnVector = {
withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, input.getRowCount.toInt)
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*** spark-rapids-shim-json-lines
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, Scalar}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuCast

import org.apache.spark.sql.catalyst.json.GpuJsonUtils

object GpuJsonToStructsShim {

def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
GpuJsonUtils.dateFormatInRead(options) match {
case "yyyy-MM-dd" =>
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
}
}
case other =>
// should be unreachable due to GpuOverrides checks
throw new IllegalStateException(s"Unsupported dateFormat $other")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,28 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.catalyst.json

import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {

def optionalDateFormatInRead(options: JSONOptions): Option[String] =
Some(options.dateFormat)

def optionalDateFormatInRead(options: Map[String, String]): Option[String] =
optionalDateFormatInRead(parseJSONReadOptions(options))

def dateFormatInRead(options: JSONOptions): String = options.dateFormat

def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat
def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,19 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {

def optionalDateFormatInRead(options: Map[String, String]): Option[String] = {
optionalDateFormatInRead(parseJSONReadOptions(options))
}
def optionalDateFormatInRead(options: JSONOptions): Option[String] =
options.dateFormatInRead

def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand All @@ -34,4 +44,12 @@ object GpuJsonUtils {
})

def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {

def optionalDateFormatInRead(options: Map[String, String]): Option[String] =
optionalDateFormatInRead(parseJSONReadOptions(options))

def optionalDateFormatInRead(options: JSONOptions): Option[String] =
options.dateFormatInRead

def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand All @@ -41,4 +51,11 @@ object GpuJsonUtils {
})

def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}
}
Loading

0 comments on commit 16515f7

Please sign in to comment.