Skip to content

Commit

Permalink
Use new jni kernel for getJsonObject (NVIDIA#10581)
Browse files Browse the repository at this point in the history
* Use new kernel for getJsonObject

Signed-off-by: Haoyang Li <[email protected]>

* Use table to pass parsed path

Signed-off-by: Haoyang Li <[email protected]>

* use list/vector of instruction objects

Signed-off-by: Haoyang Li <[email protected]>

* fallback when nested too long

Signed-off-by: Haoyang Li <[email protected]>

* cancel xfail cases

Signed-off-by: Haoyang Li <[email protected]>

* cancel xfail cases

Signed-off-by: Haoyang Li <[email protected]>

* generated and modified docs

Signed-off-by: Haoyang Li <[email protected]>

* wip

Signed-off-by: Haoyang Li <[email protected]>

* wip

Signed-off-by: Haoyang Li <[email protected]>

* apply jni change and remove xpass

Signed-off-by: Haoyang Li <[email protected]>

* Adds test cases

Signed-off-by: Haoyang Li <[email protected]>

---------

Signed-off-by: Haoyang Li <[email protected]>
  • Loading branch information
thirtiseven authored Mar 28, 2024
1 parent b57ffe0 commit d7942e2
Show file tree
Hide file tree
Showing 33 changed files with 1,343 additions and 223 deletions.
2 changes: 1 addition & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.FromUnixTime"></a>spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
<a name="sql.expression.GetArrayItem"></a>spark.rapids.sql.expression.GetArrayItem| |Gets the field at `ordinal` in the Array|true|None|
<a name="sql.expression.GetArrayStructFields"></a>spark.rapids.sql.expression.GetArrayStructFields| |Extracts the `ordinal`-th fields of all array elements for the data with the type of array of struct|true|None|
<a name="sql.expression.GetJsonObject"></a>spark.rapids.sql.expression.GetJsonObject|`get_json_object`|Extracts a json object from path|false|This is disabled by default because escape sequences are not processed correctly, the input is not validated, and the output is not normalized the same as Spark|
<a name="sql.expression.GetJsonObject"></a>spark.rapids.sql.expression.GetJsonObject|`get_json_object`|Extracts a json object from path|true|None|
<a name="sql.expression.GetMapValue"></a>spark.rapids.sql.expression.GetMapValue| |Gets Value from a Map based on a key|true|None|
<a name="sql.expression.GetStructField"></a>spark.rapids.sql.expression.GetStructField| |Gets the named field of the struct|true|None|
<a name="sql.expression.GetTimestamp"></a>spark.rapids.sql.expression.GetTimestamp| |Gets timestamps from strings using given pattern.|true|None|
Expand Down
31 changes: 6 additions & 25 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,31 +439,12 @@ Known issues are:

### get_json_object

The `GetJsonObject` operator takes a JSON formatted string and a JSON path string as input. The
code base for this is currently separate from GPU parsing of JSON for files and `FromJsonObject`.
Because of this the results can be different from each other. Because of several incompatibilities
and bugs in the GPU version of `GetJsonObject` it will be on the CPU by default. If you are
aware of the current limitations with the GPU version, you might see a significant performance
speedup if you enable it by setting `spark.rapids.sql.expression.GetJsonObject` to `true`.

The following is a list of known differences.
* [No input validation](https://github.com/NVIDIA/spark-rapids/issues/10218). If the input string
is not valid JSON Apache Spark returns a null result, but ours will still try to find a match.
* [Escapes are not properly processed for Strings](https://github.com/NVIDIA/spark-rapids/issues/10196).
When returning a result for a quoted string Apache Spark will remove the quotes and replace
any escape sequences with the proper characters. The escape sequence processing does not happen
on the GPU.
* [Invalid JSON paths could throw exceptions](https://github.com/NVIDIA/spark-rapids/issues/10212)
If a JSON path is not valid Apache Spark returns a null result, but ours may throw an exception
and fail the query.
* [Non-string output is not normalized](https://github.com/NVIDIA/spark-rapids/issues/10218)
When returning a result for things other than strings, a number of things are normalized by
Apache Spark, but are not normalized by the GPU, like removing unnecessary white space,
parsing and then serializing floating point numbers, turning single quotes to double quotes,
and removing unneeded escapes for single quotes.

The following is a list of bugs in either the GPU version or arguably in Apache Spark itself.
* https://github.com/NVIDIA/spark-rapids/issues/10219 non-matching quotes in quoted strings
Known issue:
- [Non-string output is not normalized](https://github.com/NVIDIA/spark-rapids/issues/10218)
When returning a result for things other than strings, a number of things are normalized by
Apache Spark, but are not normalized by the GPU, like removing unnecessary white space,
parsing and then serializing floating point numbers, turning single quotes to double quotes,
and removing unneeded escapes for single quotes.

## Avro

Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -6856,7 +6856,7 @@ are limited.
<td rowSpan="3">GetJsonObject</td>
<td rowSpan="3">`get_json_object`</td>
<td rowSpan="3">Extracts a json object from path</td>
<td rowSpan="3">This is disabled by default because escape sequences are not processed correctly, the input is not validated, and the output is not normalized the same as Spark</td>
<td rowSpan="3">None</td>
<td rowSpan="3">project</td>
<td>json</td>
<td> </td>
Expand Down
139 changes: 80 additions & 59 deletions integration_tests/src/main/python/get_json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def test_get_json_object(json_str_pattern):
'get_json_object(a, "$.store.fruit[0]")',
'get_json_object(\'%s\', "$.store.fruit[0]")' % scalar_json,
),
conf={'spark.sql.parser.escapedStringLiterals': 'true',
'spark.rapids.sql.expression.GetJsonObject': 'true'})
conf={'spark.sql.parser.escapedStringLiterals': 'true'})

def test_get_json_object_quoted_index():
schema = StructType([StructField("jsonStr", StringType())])
Expand All @@ -48,23 +47,19 @@ def test_get_json_object_quoted_index():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr',r'''$['a']''').alias('sub_a'),
f.get_json_object('jsonStr',r'''$['b']''').alias('sub_b')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
f.get_json_object('jsonStr',r'''$['b']''').alias('sub_b')))

@pytest.mark.skipif(is_databricks_runtime() and not is_databricks113_or_later(), reason="get_json_object on \
DB 10.4 shows incorrect behaviour with single quotes")
def test_get_json_object_single_quotes():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'''{'a':'A'}'''],
[r'''{'b':'"B'}'''],
[r'''{"c":"'C"}''']]
data = [[r'''{'a':'A'}''']]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr',r'''$['a']''').alias('sub_a'),
f.get_json_object('jsonStr',r'''$['b']''').alias('sub_b'),
f.get_json_object('jsonStr',r'''$['c']''').alias('sub_c')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
f.get_json_object('jsonStr',r'''$['c']''').alias('sub_c')))

@pytest.mark.parametrize('query',["$.store.bicycle",
"$['store'].bicycle",
Expand All @@ -73,35 +68,17 @@ def test_get_json_object_single_quotes():
"$['key with spaces']",
"$.store.book",
"$.store.book[0]",
pytest.param("$",marks=[
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'),
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196'),
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10194')]),
"$",
"$.store.book[0].category",
"$.store.basket[0][1]",
"$.store.basket[0][2].b",
"$.zip code",
"$.fb:testid",
pytest.param("$.a",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196')),
"$.a",
"$.non_exist_key",
"$..no_recursive",
"$.store.book[0].non_exist_key"])
def test_get_json_object_spark_unit_tests(query):
schema = StructType([StructField("jsonStr", StringType())])
data = [
['''{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],"basket":[[1,2,{"b":"y","a":"x"}],[3,4],[5,6]],"book":[{"author":"Nigel Rees","title":"Sayings of the Century","category":"reference","price":8.95},{"author":"Herman Melville","title":"Moby Dick","category":"fiction","price":8.99,"isbn":"0-553-21311-3"},{"author":"J. R. R. Tolkien","title":"The Lord of the Rings","category":"fiction","reader":[{"age":25,"name":"bob"},{"age":26,"name":"jack"}],"price":22.99,"isbn":"0-395-19395-8"}],"bicycle":{"price":19.95,"color":"red"}},"email":"amy@only_for_json_udf_test.net","owner":"amy","zip code":"94025","fb:testid":"1234"}'''],
['''{ "key with spaces": "it works" }'''],
['''{"a":"b\nc"}'''],
['''{"a":"b\"c"}'''],
["\u0000\u0000\u0000A\u0001AAA"],
['{"big": "' + ('x' * 3000) + '"}']]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr', query)),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@allow_non_gpu("ProjectExec", "GetJsonObject")
@pytest.mark.parametrize('query',["$.store.basket[0][*].b",
"$.store.book[0].non_exist_key",
"$.store.basket[0][*].b",
"$.store.book[*].reader",
"$.store.book[*]",
"$.store.book[*].category",
Expand All @@ -111,16 +88,20 @@ def test_get_json_object_spark_unit_tests(query):
"$.store.basket[0][*]",
"$.store.basket[*][*]",
"$.store.basket[*].non_exist_key"])
def test_get_json_object_spark_unit_tests_fallback(query):
def test_get_json_object_spark_unit_tests(query):
schema = StructType([StructField("jsonStr", StringType())])
data = [['''{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],"basket":[[1,2,{"b":"y","a":"x"}],[3,4],[5,6]],"book":[{"author":"Nigel Rees","title":"Sayings of the Century","category":"reference","price":8.95},{"author":"Herman Melville","title":"Moby Dick","category":"fiction","price":8.99,"isbn":"0-553-21311-3"},{"author":"J. R. R. Tolkien","title":"The Lord of the Rings","category":"fiction","reader":[{"age":25,"name":"bob"},{"age":26,"name":"jack"}],"price":22.99,"isbn":"0-395-19395-8"}],"bicycle":{"price":19.95,"color":"red"}},"email":"amy@only_for_json_udf_test.net","owner":"amy","zip code":"94025","fb:testid":"1234"}''']]
assert_gpu_fallback_collect(
data = [
['''{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],"basket":[[1,2,{"b":"y","a":"x"}],[3,4],[5,6]],"book":[{"author":"Nigel Rees","title":"Sayings of the Century","category":"reference","price":8.95},{"author":"Herman Melville","title":"Moby Dick","category":"fiction","price":8.99,"isbn":"0-553-21311-3"},{"author":"J. R. R. Tolkien","title":"The Lord of the Rings","category":"fiction","reader":[{"age":25,"name":"bob"},{"age":26,"name":"jack"}],"price":22.99,"isbn":"0-395-19395-8"}],"bicycle":{"price":19.95,"color":"red"}},"email":"amy@only_for_json_udf_test.net","owner":"amy","zip code":"94025","fb:testid":"1234"}'''],
['''{ "key with spaces": "it works" }'''],
['''{"a":"b\nc"}'''],
['''{"a":"b\"c"}'''],
["\u0000\u0000\u0000A\u0001AAA"],
['{"big": "' + ('x' * 3000) + '"}']]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr', query)),
"GetJsonObject",
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
f.get_json_object('jsonStr', query)))

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10218")
# @pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10218")
def test_get_json_object_normalize_non_string_output():
schema = StructType([StructField("jsonStr", StringType())])
data = [[' { "a": "A" } '],
Expand All @@ -139,19 +120,16 @@ def test_get_json_object_normalize_non_string_output():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.col('jsonStr'),
f.get_json_object('jsonStr', '$')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
f.get_json_object('jsonStr', '$')))

def test_get_json_object_quoted_question():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'{"?":"QUESTION"}']]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr',r'''$['?']''').alias('question')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
f.get_json_object('jsonStr',r'''$['?']''').alias('question')))

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10196")
def test_get_json_object_escaped_string_data():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'{"a":"A\"B"}'],
Expand All @@ -164,10 +142,8 @@ def test_get_json_object_escaped_string_data():
[r'{"a":"A\tB"}']]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).selectExpr('get_json_object(jsonStr,"$.a")'),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
lambda spark: spark.createDataFrame(data,schema=schema).selectExpr('get_json_object(jsonStr,"$.a")'))

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10196")
def test_get_json_object_escaped_key():
schema = StructType([StructField("jsonStr", StringType())])
data = [
Expand Down Expand Up @@ -203,10 +179,8 @@ def test_get_json_object_escaped_key():
f.get_json_object('jsonStr','$.a\n').alias('qan2'),
f.get_json_object('jsonStr', r'$.a\t').alias('qat1'),
f.get_json_object('jsonStr','$.a\t').alias('qat2')
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
))

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10212")
def test_get_json_object_invalid_path():
schema = StructType([StructField("jsonStr", StringType())])
data = [['{"a":"A"}'],
Expand All @@ -227,8 +201,7 @@ def test_get_json_object_invalid_path():
f.get_json_object('jsonStr', 'a').alias('just_a'),
f.get_json_object('jsonStr', '[-1]').alias('neg_one_index'),
f.get_json_object('jsonStr', '$.c[-1]').alias('c_neg_one_index'),
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
))

def test_get_json_object_top_level_array_notation():
# This is a special version of invalid path. It is something that the GPU supports
Expand All @@ -244,8 +217,7 @@ def test_get_json_object_top_level_array_notation():
f.get_json_object('jsonStr', '$[1]').alias('one_index'),
f.get_json_object('jsonStr', '''['a']''').alias('sub_a'),
f.get_json_object('jsonStr', '''$['b']''').alias('sub_b'),
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
))

def test_get_json_object_unquoted_array_notation():
# This is a special version of invalid path. It is something that the GPU supports
Expand All @@ -260,8 +232,7 @@ def test_get_json_object_unquoted_array_notation():
f.get_json_object('jsonStr', '$[a]').alias('a_index'),
f.get_json_object('jsonStr', '$[1]').alias('one_index'),
f.get_json_object('jsonStr', '''$['1']''').alias('quoted_one_index'),
f.get_json_object('jsonStr', '$[a1]').alias('a_one_index')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
f.get_json_object('jsonStr', '$[a1]').alias('a_one_index')))


def test_get_json_object_white_space_removal():
Expand Down Expand Up @@ -298,9 +269,60 @@ def test_get_json_object_white_space_removal():
f.get_json_object('jsonStr', "$[' a. a']").alias('space_a_dot_space_a'),
f.get_json_object('jsonStr', "$['a .a ']").alias('a_space_dot_a_space'),
f.get_json_object('jsonStr', "$[' a . a ']").alias('space_a_space_dot_space_a_space'),
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})
))


def test_get_json_object_jni_java_tests():
schema = StructType([StructField("jsonStr", StringType())])
data = [['\'abc\''],
['[ [11, 12], [21, [221, [2221, [22221, 22222]]]], [31, 32] ]'],
['123'],
['{ \'k\' : \'v\' }'],
['[ [[[ {\'k\': \'v1\'} ], {\'k\': \'v2\'}]], [[{\'k\': \'v3\'}], {\'k\': \'v4\'}], {\'k\': \'v5\'} ]'],
['[1, [21, 22], 3]'],
['[ {\'k\': [0, 1, 2]}, {\'k\': [10, 11, 12]}, {\'k\': [20, 21, 22]} ]'],
['[ [0], [10, 11, 12], [2] ]'],
['[[0, 1, 2], [10, [111, 112, 113], 12], [20, 21, 22]]'],
['[[0, 1, 2], [10, [], 12], [20, 21, 22]]'],
['{\'k\' : [0,1,2]}'],
['{\'k\' : null}']
]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.col('jsonStr'),
f.get_json_object('jsonStr', '$').alias('dollor'),
f.get_json_object('jsonStr', '$[*][*]').alias('s_w_s_w'),
f.get_json_object('jsonStr', '$.k').alias('dot_k'),
f.get_json_object('jsonStr', '$[*]').alias('s_w'),
f.get_json_object('jsonStr', '$[*].k[*]').alias('s_w_k_s_w'),
f.get_json_object('jsonStr', '$[1][*]').alias('s_1_s_w'),
f.get_json_object('jsonStr', "$[1][1][*]").alias('s_1_s_1_s_w'),
f.get_json_object('jsonStr', "$.k[1]").alias('dot_k_s_1'),
f.get_json_object('jsonStr', "$.*").alias('w'),
))


@allow_non_gpu('ProjectExec')
def test_deep_nested_json():
schema = StructType([StructField("jsonStr", StringType())])
data = [['{"a":{"b":{"c":{"d":{"e":{"f":{"g":{"h":{"i":{"j":{"k":{"l":{"m":{"n":{"o":{"p":{"q":{"r":{"s":{"t":{"u":{"v":{"w":{"x":{"y":{"z":"A"}}'
]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr', '$.a.b.c.d.e.f.g.h.i').alias('i'),
f.get_json_object('jsonStr', '$.a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p').alias('p')
))

@allow_non_gpu('ProjectExec')
def test_deep_nested_json_fallback():
schema = StructType([StructField("jsonStr", StringType())])
data = [['{"a":{"b":{"c":{"d":{"e":{"f":{"g":{"h":{"i":{"j":{"k":{"l":{"m":{"n":{"o":{"p":{"q":{"r":{"s":{"t":{"u":{"v":{"w":{"x":{"y":{"z":"A"}}'
]]
assert_gpu_fallback_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr', '$.a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z').alias('z')),
'GetJsonObject')

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('json_str_pattern', [r'\{"store": \{"fruit": \[\{"weight":\d,"type":"[a-z]{1,9}"\}\], ' \
Expand All @@ -315,8 +337,7 @@ def assert_gpu_did_fallback(sql_text):
assert_gpu_fallback_collect(lambda spark:
gen_df(spark, [('a', gen), ('b', pattern)], length=10).selectExpr(sql_text),
'GetJsonObject',
conf={'spark.sql.parser.escapedStringLiterals': 'true',
'spark.rapids.sql.expression.GetJsonObject': 'true'})
conf={'spark.sql.parser.escapedStringLiterals': 'true'})

assert_gpu_did_fallback('get_json_object(a, b)')
assert_gpu_did_fallback('get_json_object(\'%s\', b)' % scalar_json)
Expand Down
Loading

0 comments on commit d7942e2

Please sign in to comment.