Skip to content

Commit

Permalink
Support timestamp in from_json [databricks] (#9720)
Browse files Browse the repository at this point in the history
* Support timestamp in from_json

* fix shims

* fix shims

* signoff

Signed-off-by: Andy Grove <[email protected]>

* improve tests

* fix 321db shim

* update compatibility guide

---------

Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove authored Nov 21, 2023
1 parent 1afead7 commit f354ddf
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 14 deletions.
10 changes: 9 additions & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,15 @@ Dates are partially supported but there are some known issues:
parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4
will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown.

Timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).
Timestamps are partially supported but there are some known issues:

- Only the default `timestampFormat` of `yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]` is supported. The query will fall back to CPU if any other format
is specified ([#9273](https://github.com/NVIDIA/spark-rapids/issues/9723))
- Strings containing integers with more than four digits will be
parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4
will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown.
- Strings containing special date constant values such as `now` and `today` will parse as null ([#9724](https://github.com/NVIDIA/spark-rapids/issues/9724)),
which differs from the behavior in Spark 3.1.x

When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting
for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)).
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8141,8 +8141,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
84 changes: 83 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import timezone
from conftest import is_databricks_runtime
from marks import approximate_float, allow_non_gpu, ignore_order
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330, is_before_spark_340, \
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_320, is_before_spark_330, is_before_spark_340, \
is_before_spark_341

json_supported_gens = [
Expand Down Expand Up @@ -600,6 +600,88 @@ def test_from_json_struct_date_fallback_non_default_format(date_gen, date_format
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})

@pytest.mark.parametrize('timestamp_gen', [
# "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}(\\.[0-9]{1,6})?Z?[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]}?\"",
# "yyyy-MM-dd"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "yyyy-MM"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "yyyy"
"\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[0-9]{4}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"",
# "dd/MM/yyyy"
"\"[0-9]{2}/[0-9]{2}/[1-8]{1}[0-9]{3}\"",
# special constant values
pytest.param("\"(now|today|tomorrow|epoch)\"", marks=pytest.mark.xfail(condition=is_before_spark_320(), reason="https://github.com/NVIDIA/spark-rapids/issues/9724")),
# "nnnnn" (number of days since epoch prior to Spark 3.4, throws exception from 3.4)
pytest.param("\"[0-9]{5}\"", marks=pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9664")),
# integral
pytest.param("[0-9]{1,5}", marks=pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9588")),
"[1-9]{1,8}",
# floating-point
"[0-9]{0,2}\.[0-9]{1,2}"
# boolean
"(true|false)"
])
@pytest.mark.parametrize('timestamp_format', [
"",
"yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
# https://github.com/NVIDIA/spark-rapids/issues/9723
pytest.param("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", marks=pytest.mark.allow_non_gpu('ProjectExec')),
pytest.param("dd/MM/yyyy'T'HH:mm:ss[.SSS][XXX]", marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param("LEGACY", marks=pytest.mark.allow_non_gpu('ProjectExec')),
"CORRECTED"
])
@pytest.mark.parametrize('ansi_enabled', [ True, False ])
def test_from_json_struct_timestamp(timestamp_gen, timestamp_format, time_parser_policy, ansi_enabled):
json_string_gen = StringGen(r'{ "a": ' + timestamp_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'timestampFormat': timestamp_format } if len(timestamp_format) > 0 else { }
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:timestamp>', options)),
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy,
'spark.sql.ansi.enabled': ansi_enabled })

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('timestamp_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}(\\.[0-9]{1,6})?Z?\""])
@pytest.mark.parametrize('timestamp_format', [
"",
"yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]",
])
def test_from_json_struct_timestamp_fallback_legacy(timestamp_gen, timestamp_format):
json_string_gen = StringGen(r'{ "a": ' + timestamp_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'timestampFormat': timestamp_format } if len(timestamp_format) > 0 else { }
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:timestamp>', options)),
'ProjectExec',
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'LEGACY'})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('timestamp_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}(\\.[0-9]{1,6})?Z?\""])
@pytest.mark.parametrize('timestamp_format', [
"yyyy-MM-dd'T'HH:mm:ss.SSSXXX",
"dd/MM/yyyy'T'HH:mm:ss[.SSS][XXX]",
])
def test_from_json_struct_timestamp_fallback_non_default_format(timestamp_gen, timestamp_format):
json_string_gen = StringGen(r'{ "a": ' + timestamp_gen + ' }') \
.with_special_case('{ "a": null }') \
.with_special_case('null')
options = { 'timestampFormat': timestamp_format } if len(timestamp_format) > 0 else { }
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.col('a'), f.from_json('a', 'struct<a:timestamp>', options)),
'ProjectExec',
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})

@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ object GpuCast {
}

/** This method does not close the `input` ColumnVector. */
private def convertTimestampOrNull(
def convertTimestampOrNull(
input: ColumnVector,
regex: String,
cudfFormat: String): ColumnVector = {
Expand Down Expand Up @@ -1463,7 +1463,7 @@ object GpuCast {
}
}

private def castStringToTimestamp(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
def castStringToTimestamp(input: ColumnVector, ansiMode: Boolean): ColumnVector = {

// special timestamps
val today = DateUtils.currentDate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3570,7 +3570,8 @@ object GpuOverrides extends Logging {
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE) +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE +
TypeSig.TIMESTAMP) +
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ object GpuJsonScan {
}
}

val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[TimestampType])
if (hasTimestamps) {
GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") =>
// this is fine
case timestampFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported timestampFormat $timestampFormat")
}
}

if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) {
meta.willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ai.rapids.cudf
import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.GpuCast
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.jni.MapUtils
import com.nvidia.spark.rapids.shims.GpuJsonToStructsShim
Expand Down Expand Up @@ -215,7 +215,13 @@ case class GpuJsonToStructs(
GpuJsonToStructsShim.castJsonStringToDate(col, options)
case (_, DataTypes.DateType) =>
castToNullDate(input.getBase)
case _ => doCast(col, sparkType, dtype)
case (DataTypes.StringType, DataTypes.TimestampType) =>
GpuJsonToStructsShim.castJsonStringToTimestamp(col, options)
case (DataTypes.LongType, DataTypes.TimestampType) =>
GpuCast.castLongToTimestamp(col, DataTypes.TimestampType)
case (_, DataTypes.TimestampType) =>
castToNullTimestamp(input.getBase)
case _ => GpuCast.doCast(col, sparkType, dtype)
}

}
Expand Down Expand Up @@ -267,6 +273,12 @@ case class GpuJsonToStructs(
}
}

private def castToNullTimestamp(input: ColumnVector): ColumnVector = {
withResource(Scalar.fromNull(DType.TIMESTAMP_MICROSECONDS)) { nullScalar =>
ColumnVector.fromScalar(nullScalar, input.getRowCount.toInt)
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,13 @@ object GpuJsonToStructsShim {
}
}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
// from_json doesn't respect ansi mode
GpuCast.castStringToTimestamp(trimmed, ansiMode = false)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ object GpuJsonUtils {
def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def optionalTimestampFormatInRead(options: JSONOptions): Option[String] =
Some(options.timestampFormat)
def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] =
optionalTimestampFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat
def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ object GpuJsonUtils {
def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def optionalTimestampFormatInRead(options: JSONOptions): Option[String] =
options.timestampFormatInRead

def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] =
optionalTimestampFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ object GpuJsonUtils {
def dateFormatInRead(options: Map[String, String]): String =
dateFormatInRead(parseJSONReadOptions(options))

def optionalTimestampFormatInRead(options: JSONOptions): Option[String] =
options.timestampFormatInRead

def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] =
optionalTimestampFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,23 @@ object GpuJsonToStructsShim {
}
}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
options.get("timestampFormat") match {
case None =>
// legacy behavior
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
// from_json doesn't respect ansi mode
GpuCast.castStringToTimestamp(trimmed, ansiMode = false)
}
}
case Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") =>
GpuCast.convertTimestampOrNull(input,
"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]{1,6})?Z?$", "%Y-%m-%d")
case other =>
// should be unreachable due to GpuOverrides checks
throw new IllegalStateException(s"Unsupported timestampFormat $other")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ object GpuJsonUtils {
options.dateFormatInRead

def optionalDateFormatInRead(options: Map[String, String]): Option[String] = {
val parsedOptions = new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
optionalDateFormatInRead(parsedOptions)
optionalDateFormatInRead(parseJSONReadOptions(options))
}

/**
Expand All @@ -51,6 +47,12 @@ object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)

def optionalTimestampFormatInRead(options: JSONOptions): Option[String] =
options.timestampFormatInRead

def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] =
optionalTimestampFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse(
if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy()) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
Expand All @@ -60,4 +62,11 @@ object GpuJsonUtils {

def enableDateTimeParsingFallback(options: JSONOptions): Boolean =
options.enableDateTimeParsingFallback.getOrElse(false)

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}
}

0 comments on commit f354ddf

Please sign in to comment.