Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parsing strings as dates in from_json [databricks] #9666

Merged
merged 23 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,15 @@ This particular function supports to output a map or struct type with limited fu
The `from_json` function is disabled by default because it is experimental and has some known incompatibilities
with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStructs=true`.

There are several known issues:
Dates are partially supported but there are some known issues:

Dates and timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).
- Only the default `dateFormat` of `yyyy-MM-dd` is supported. The query will fall back to CPU if any other format
is specified ([#9667](https://github.com/NVIDIA/spark-rapids/issues/9667))
- Strings containing integers with more than four digits will be
parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4
will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown.

Timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).

When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting
for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)).
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8141,8 +8141,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
78 changes: 78 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,84 @@ def test_from_json_struct_decimal():
.select(f.from_json('a', 'struct<a:decimal>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('date_gen', [
# "yyyy-MM-dd"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# "yyyy-MM"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "yyyy"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[0-9]{4}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "dd/MM/yyyy"
"\"[0-9]{2}/[0-9]{2}/[1-8]{1}[0-9]{3}\"",
# special constant values
"\"(now|today|tomorrow|epoch)\"",
# "nnnnn" (number of days since epoch prior to Spark 3.4, throws exception from 3.4)
pytest.param("\"[0-9]{5}\"", marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/9664")),
# integral
"[0-9]{1,5}",
# floating-point
"[0-9]{0,2}\\.[0-9]{1,2}"
# boolean
"(true|false)"
])
@pytest.mark.parametrize('date_format', [
"",
"yyyy-MM-dd",
# https://github.com/NVIDIA/spark-rapids/issues/9667
pytest.param("dd/MM/yyyy", marks=pytest.mark.allow_non_gpu('ProjectExec')),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I meant a separate test that does a fallback check, but I think this is okay.

])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param("LEGACY", marks=pytest.mark.allow_non_gpu('ProjectExec')),
"CORRECTED"
])
def test_from_json_struct_date(date_gen, date_format, time_parser_policy):
json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'dateFormat': date_format } if len(date_format) > 0 else { }
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:date>', options)),
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('date_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}\""])
@pytest.mark.parametrize('date_format', [
"",
"yyyy-MM-dd",
])
def test_from_json_struct_date_fallback_legacy(date_gen, date_format):
json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'dateFormat': date_format } if len(date_format) > 0 else { }
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:date>', options)),
'ProjectExec',
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'LEGACY'})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('date_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}\""])
@pytest.mark.parametrize('date_format', [
"dd/MM/yyyy",
"yyyy/MM/dd",
])
def test_from_json_struct_date_fallback_non_default_format(date_gen, date_format):
json_string_gen = StringGen(r'{ "a": ' + date_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'dateFormat': date_format } if len(date_format) > 0 else { }
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:date>', options)),
'ProjectExec',
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})
revans2 marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
'struct<teacher:string,student:struct<name:string,age:int>>'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ object GpuCast {
* `yyyy-[m]m-[d]d *`
* `yyyy-[m]m-[d]dT*`
*/
private def castStringToDate(sanitizedInput: ColumnVector): ColumnVector = {
def castStringToDate(sanitizedInput: ColumnVector): ColumnVector = {

// convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd
val converted = convertDateOr(sanitizedInput, DATE_REGEX_YYYY_MM_DD, "%Y-%m-%d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3570,21 +3570,22 @@ object GpuOverrides extends Logging {
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN) +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE) +
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
override def tagExprForGpu(): Unit = {
a.schema match {
case MapType(_: StringType, _: StringType, _) => ()
case _: StructType => ()
case _ =>
willNotWorkOnGpu("from_json on GPU only supports MapType<StringType, StringType> " +
"or StructType schema")
}
GpuJsonScan.tagJsonToStructsSupport(a.options, this)
GpuJsonScan.tagJsonToStructsSupport(a.options, a.dataType, this)
}

override def convertToGpu(child: Expression): GpuExpression =
// GPU implementation currently does not support duplicated json key names in input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ai.rapids.cudf
import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, NvtxColor, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.ShimFilePartitionReaderFactory
import com.nvidia.spark.rapids.shims.{LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
Expand All @@ -40,7 +40,8 @@ import org.apache.spark.sql.execution.datasources.{PartitionedFile, Partitioning
import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -103,12 +104,27 @@ object GpuJsonScan {
}

def tagJsonToStructsSupport(options:Map[String, String],
meta: RapidsMeta[_, _, _]): Unit = {
dt: DataType,
meta: RapidsMeta[_, _, _]): Unit = {
val parsedOptions = new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)

val hasDates = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[DateType])
if (hasDates) {
GpuJsonUtils.optionalDateFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd") =>
// this is fine
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
}

if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) {
meta.willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs")
}

tagSupportOptions(parsedOptions, meta)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.jni.MapUtils
import com.nvidia.spark.rapids.shims.GpuJsonToStructsShim
import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
Expand Down Expand Up @@ -210,6 +211,10 @@ case class GpuJsonToStructs(
(sparkType, dtype) match {
case (DataTypes.StringType, DataTypes.BooleanType) =>
castJsonStringToBool(col)
case (DataTypes.StringType, DataTypes.DateType) =>
GpuJsonToStructsShim.castJsonStringToDate(col, options)
case (_, DataTypes.DateType) =>
castToNullDate(input.getBase)
case _ => doCast(col, sparkType, dtype)
}

Expand All @@ -235,7 +240,7 @@ case class GpuJsonToStructs(

private def castJsonStringToBool(input: ColumnVector): ColumnVector = {
val isTrue = withResource(Scalar.fromString("true")) { trueStr =>
input.equalTo(trueStr)
input.equalTo(trueStr)
}
withResource(isTrue) { _ =>
val isFalse = withResource(Scalar.fromString("false")) { falseStr =>
Expand All @@ -256,6 +261,12 @@ case class GpuJsonToStructs(
}
}

private def castToNullDate(input: ColumnVector): ColumnVector = {
withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, input.getRowCount.toInt)
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*** spark-rapids-shim-json-lines
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, Scalar}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuCast

import org.apache.spark.sql.catalyst.json.GpuJsonUtils

object GpuJsonToStructsShim {

def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
GpuJsonUtils.dateFormatInRead(options) match {
case "yyyy-MM-dd" =>
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
}
}
case other =>
// should be unreachable due to GpuOverrides checks
throw new IllegalStateException(s"Unsupported dateFormat $other")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,28 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.catalyst.json

import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {

def optionalDateFormatInRead(options: JSONOptions): Option[String] =
Some(options.dateFormat)

def optionalDateFormatInRead(options: Map[String, String]): Option[String] =
optionalDateFormatInRead(parseJSONReadOptions(options))

def dateFormatInRead(options: JSONOptions): String = options.dateFormat

def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat
def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,19 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {

def optionalDateFormatInRead(options: Map[String, String]): Option[String] = {
dateFormatInRead(parseJSONReadOptions(options))
}
def optionalDateFormatInRead(options: JSONOptions): Option[String] =
options.dateFormatInRead

def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand All @@ -34,4 +44,12 @@ object GpuJsonUtils {
})

def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.internal.SQLConf

object GpuJsonUtils {

def optionalDateFormatInRead(options: Map[String, String]): Option[String] =
optionalDateFormatInRead(parseJSONReadOptions(options))

def optionalDateFormatInRead(options: JSONOptions): Option[String] =
options.dateFormatInRead

def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand All @@ -41,4 +51,11 @@ object GpuJsonUtils {
})

def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}
}
Loading