Skip to content

Commit

Permalink
Use parser from spark to normalize json path in GetJsonObject (#10466)
Browse files Browse the repository at this point in the history
* remove leading space for json path in GetJsonObject

Signed-off-by: Haoyang Li <[email protected]>

* Update comments

Signed-off-by: Haoyang Li <[email protected]>

* Use JsonPathParser to normalize path

Signed-off-by: Haoyang Li <[email protected]>

* Update compatibility doc

Signed-off-by: Haoyang Li <[email protected]>

* clean up

Signed-off-by: Haoyang Li <[email protected]>

* Fallback json paths containing  in GetJsonObject

Signed-off-by: Haoyang Li <[email protected]>

* cache normalizeJsonPath and prevent memory leak

Signed-off-by: Haoyang Li <[email protected]>

* clean up

Signed-off-by: Haoyang Li <[email protected]>

* ready to merge

Signed-off-by: Haoyang Li <[email protected]>

* Use parser to check whether to fallback

Signed-off-by: Haoyang Li <[email protected]>

* Add a special case

Signed-off-by: Haoyang Li <[email protected]>

---------

Signed-off-by: Haoyang Li <[email protected]>
  • Loading branch information
thirtiseven authored Feb 28, 2024
1 parent ac993f7 commit b9c292c
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 37 deletions.
10 changes: 0 additions & 10 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,16 +479,6 @@ The following is a list of known differences.

The following is a list of bugs in either the GPU version or arguably in Apache Spark itself.
* https://github.com/NVIDIA/spark-rapids/issues/10219 non-matching quotes in quoted strings
* https://github.com/NVIDIA/spark-rapids/issues/10213 array index notation works without root
* https://github.com/NVIDIA/spark-rapids/issues/10214 unquoted array index notation is not
supported
* https://github.com/NVIDIA/spark-rapids/issues/10215 leading spaces can be stripped from named
keys.
* https://github.com/NVIDIA/spark-rapids/issues/10216 It appears that Spark is flattening some
output, which is different from other implementations including the GPU version.
* https://github.com/NVIDIA/spark-rapids/issues/10217 a JSON path execution bug
* https://issues.apache.org/jira/browse/SPARK-46761 Apache Spark does not allow the `?` character in
a quoted JSON path string.

## Avro

Expand Down
54 changes: 37 additions & 17 deletions integration_tests/src/main/python/get_json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,19 @@ def test_get_json_object_single_quotes():
"$['key with spaces']",
"$.store.book",
"$.store.book[0]",
"$.store.book[*]",
pytest.param("$",marks=[
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'),
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196'),
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10194')]),
"$.store.book[0].category",
"$.store.book[*].category",
"$.store.book[*].isbn",
pytest.param("$.store.book[*].reader",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10216')),
"$.store.basket[0][1]",
"$.store.basket[*]",
"$.store.basket[*][0]",
"$.store.basket[0][*]",
"$.store.basket[*][*]",
"$.store.basket[0][2].b",
pytest.param("$.store.basket[0][*].b",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10217')),
"$.zip code",
"$.fb:testid",
pytest.param("$.a",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196')),
"$.non_exist_key",
pytest.param("$..no_recursive", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10212')),
"$.store.book[0].non_exist_key",
"$.store.basket[*].non_exist_key"])
"$..no_recursive",
"$.store.book[0].non_exist_key"])
def test_get_json_object_spark_unit_tests(query):
schema = StructType([StructField("jsonStr", StringType())])
data = [
Expand All @@ -110,6 +100,26 @@ def test_get_json_object_spark_unit_tests(query):
f.get_json_object('jsonStr', query)),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@allow_non_gpu("ProjectExec", "GetJsonObject")
@pytest.mark.parametrize('query',["$.store.basket[0][*].b",
"$.store.book[*].reader",
"$.store.book[*]",
"$.store.book[*].category",
"$.store.book[*].isbn",
"$.store.basket[*]",
"$.store.basket[*][0]",
"$.store.basket[0][*]",
"$.store.basket[*][*]",
"$.store.basket[*].non_exist_key"])
def test_get_json_object_spark_unit_tests_fallback(query):
schema = StructType([StructField("jsonStr", StringType())])
data = [['''{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],"basket":[[1,2,{"b":"y","a":"x"}],[3,4],[5,6]],"book":[{"author":"Nigel Rees","title":"Sayings of the Century","category":"reference","price":8.95},{"author":"Herman Melville","title":"Moby Dick","category":"fiction","price":8.99,"isbn":"0-553-21311-3"},{"author":"J. R. R. Tolkien","title":"The Lord of the Rings","category":"fiction","reader":[{"age":25,"name":"bob"},{"age":26,"name":"jack"}],"price":22.99,"isbn":"0-395-19395-8"}],"bicycle":{"price":19.95,"color":"red"}},"email":"amy@only_for_json_udf_test.net","owner":"amy","zip code":"94025","fb:testid":"1234"}''']]
assert_gpu_fallback_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr', query)),
"GetJsonObject",
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10218")
def test_get_json_object_normalize_non_string_output():
schema = StructType([StructField("jsonStr", StringType())])
Expand All @@ -132,7 +142,6 @@ def test_get_json_object_normalize_non_string_output():
f.get_json_object('jsonStr', '$')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://issues.apache.org/jira/browse/SPARK-46761")
def test_get_json_object_quoted_question():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'{"?":"QUESTION"}']]
Expand Down Expand Up @@ -221,7 +230,6 @@ def test_get_json_object_invalid_path():
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10213")
def test_get_json_object_top_level_array_notation():
# This is a special version of invalid path. It is something that the GPU supports
# but the CPU thinks is invalid
Expand All @@ -239,7 +247,6 @@ def test_get_json_object_top_level_array_notation():
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10214")
def test_get_json_object_unquoted_array_notation():
# This is a special version of invalid path. It is something that the GPU supports
# but the CPU thinks is invalid
Expand All @@ -257,27 +264,40 @@ def test_get_json_object_unquoted_array_notation():
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})


@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10215")
def test_get_json_object_white_space_removal():
# This is a special version of invalid path. It is something that the GPU supports
# but the CPU thinks is invalid
schema = StructType([StructField("jsonStr", StringType())])
data = [['{" a":" A"," b":" B"}'],
['{"a":"A","b":"B"}'],
['{"a ":"A ","b ":"B "}'],
['{" a ":" A "," b ":" B "}']]
['{" a ":" A "," b ":" B "}'],
['{" a ": {" a ":" A "}," b ": " B "}'],
['{" a":"b","a.a":"c","b":{"a":"ab"}}'],
['{" a":"b"," a. a":"c","b":{"a":"ab"}}'],
['{" a":"b","a .a ":"c","b":{"a":"ab"}}'],
['{" a":"b"," a . a ":"c","b":{"a":"ab"}}']
]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.col('jsonStr'),
f.get_json_object('jsonStr', '$.a').alias('dot_a'),
f.get_json_object('jsonStr', '$. a').alias('dot_space_a'),
f.get_json_object('jsonStr', '$.\ta').alias('dot_tab_a'),
f.get_json_object('jsonStr', '$. a').alias('dot_spaces_a3'),
f.get_json_object('jsonStr', '$.a ').alias('dot_a_space'),
f.get_json_object('jsonStr', '$. a ').alias('dot_space_a_space'),
f.get_json_object('jsonStr', "$['b']").alias('dot_b'),
f.get_json_object('jsonStr', "$[' b']").alias('dot_space_b'),
f.get_json_object('jsonStr', "$['b ']").alias('dot_b_space'),
f.get_json_object('jsonStr', "$[' b ']").alias('dot_space_b_space'),
f.get_json_object('jsonStr', "$. a. a").alias('dot_space_a_dot_space_a'),
f.get_json_object('jsonStr', "$.a .a ").alias('dot_a_space_dot_a_space'),
f.get_json_object('jsonStr', "$. a . a ").alias('dot_space_a_space_dot_space_a_space'),
f.get_json_object('jsonStr', "$[' a. a']").alias('space_a_dot_space_a'),
f.get_json_object('jsonStr', "$['a .a ']").alias('a_space_dot_a_space'),
f.get_json_object('jsonStr', "$[' a . a ']").alias('space_a_space_dot_space_a_space'),
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,115 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{ColumnVector,GetJsonObjectOptions}
import scala.util.parsing.combinator.RegexParsers

import ai.rapids.cudf.{ColumnVector, GetJsonObjectOptions, Scalar}
import com.nvidia.spark.rapids.Arm.withResource

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, GetJsonObject}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

// Copied from Apache Spark org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
private[this] sealed trait PathInstruction
private[this] object PathInstruction {
case object Subscript extends PathInstruction
case object Wildcard extends PathInstruction
case object Key extends PathInstruction
case class Index(index: Long) extends PathInstruction
case class Named(name: String) extends PathInstruction
}

private[this] object JsonPathParser extends RegexParsers {
import PathInstruction._

def root: Parser[Char] = '$'

def long: Parser[Long] = "\\d+".r ^? {
case x => x.toLong
}

// parse `[*]` and `[123]` subscripts
def subscript: Parser[List[PathInstruction]] =
for {
operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']'
} yield {
Subscript :: operand :: Nil
}

// parse `.name` or `['name']` child expressions
def named: Parser[List[PathInstruction]] =
for {
name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']"
} yield {
Key :: Named(name) :: Nil
}

// child wildcards: `..`, `.*` or `['*']`
def wildcard: Parser[List[PathInstruction]] =
(".*" | "['*']") ^^^ List(Wildcard)

def node: Parser[List[PathInstruction]] =
wildcard |
named |
subscript

val expression: Parser[List[PathInstruction]] = {
phrase(root ~> rep(node) ^^ (x => x.flatten))
}

def parse(str: String): Option[List[PathInstruction]] = {
this.parseAll(expression, str) match {
case Success(result, _) =>
Some(result)

case _ =>
None
}
}

def containsUnsupportedPath(instructions: List[PathInstruction]): Boolean = {
// Gpu GetJsonObject is not supported if JSON path contains wildcard [*]
// see https://github.com/NVIDIA/spark-rapids/issues/10216
instructions.exists {
case Wildcard => true
case Named(name) if name == "*" => true
case _ => false
}
}

def normalize(instructions: List[PathInstruction]): String = {
// convert List[PathInstruction] to String
"$" + instructions.map {
case Subscript | Key => ""
case Wildcard => "[*]"
case Index(index) => s"[$index]"
case Named(name) => s"['$name']"
case _ => throw new IllegalArgumentException(s"Invalid instruction in path")
}.mkString
}
}

class GpuGetJsonObjectMeta(
expr: GetJsonObject,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule
) extends BinaryExprMeta[GetJsonObject](expr, conf, parent, rule) {

override def tagExprForGpu(): Unit = {
val lit = GpuOverrides.extractLit(expr.right)
lit.map { l =>
val instructions = JsonPathParser.parse(l.value.asInstanceOf[UTF8String].toString)
if (instructions.exists(JsonPathParser.containsUnsupportedPath)) {
willNotWorkOnGpu("get_json_object on GPU does not support wildcard [*] in path")
}
}
}

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuGetJsonObject(lhs, rhs)
}

case class GpuGetJsonObject(json: Expression, path: Expression)
extends GpuBinaryExpressionArgsAnyScalar
Expand All @@ -32,9 +136,30 @@ case class GpuGetJsonObject(json: Expression, path: Expression)
override def nullable: Boolean = true
override def prettyName: String = "get_json_object"

override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
lhs.getBase().getJSONObject(rhs.getBase,
GetJsonObjectOptions.builder().allowSingleQuotes(true).build());
private var cachedNormalizedPath: Option[Option[String]] = None

def normalizeJsonPath(path: GpuScalar): Option[String] = {
if (path.isValid) {
val pathStr = path.getValue.toString()
JsonPathParser.parse(pathStr).map(JsonPathParser.normalize)
} else {
None
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
cachedNormalizedPath.getOrElse {
val normalizedPath: Option[String] = normalizeJsonPath(rhs)
cachedNormalizedPath = Some(normalizedPath)
normalizedPath
} match {
case Some(normalizedStr) =>
withResource(Scalar.fromString(normalizedStr)) { scalar =>
lhs.getBase().getJSONObject(scalar,
GetJsonObjectOptions.builder().allowSingleQuotes(true).build())
}
case None => GpuColumnVector.columnVectorFromNull(lhs.getRowCount.toInt, StringType)
}
}

override def doColumnar(numRows: Int, lhs: GpuScalar, rhs: GpuScalar): ColumnVector = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3646,10 +3646,7 @@ object GpuOverrides extends Logging {
ExprChecks.projectOnly(
TypeSig.STRING, TypeSig.STRING, Seq(ParamCheck("json", TypeSig.STRING, TypeSig.STRING),
ParamCheck("path", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING))),
(a, conf, p, r) => new BinaryExprMeta[GetJsonObject](a, conf, p, r) {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuGetJsonObject(lhs, rhs)
}
(a, conf, p, r) => new GpuGetJsonObjectMeta(a, conf, p, r)
).disabledByDefault("escape sequences are not processed correctly, the input is not " +
"validated, and the output is not normalized the same as Spark"),
expr[JsonToStructs](
Expand Down

0 comments on commit b9c292c

Please sign in to comment.