Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new getJsonObject kernel for json_tuple #10635

Merged
merged 9 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.IsNotNull"></a>spark.rapids.sql.expression.IsNotNull|`isnotnull`|Checks if a value is not null|true|None|
<a name="sql.expression.IsNull"></a>spark.rapids.sql.expression.IsNull|`isnull`|Checks if a value is null|true|None|
<a name="sql.expression.JsonToStructs"></a>spark.rapids.sql.expression.JsonToStructs|`from_json`|Returns a struct value with the given `jsonStr` and `schema`|false|This is disabled by default because it is currently in beta and undergoes continuous enhancements. Please consult the [compatibility documentation](../compatibility.md#json-supporting-types) to determine whether you can enable this configuration for your use case|
<a name="sql.expression.JsonTuple"></a>spark.rapids.sql.expression.JsonTuple|`json_tuple`|Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.|false|This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.|
<a name="sql.expression.JsonTuple"></a>spark.rapids.sql.expression.JsonTuple|`json_tuple`|Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.|false|This is disabled by default because Experimental feature that could be unstable or have performance issues.|
<a name="sql.expression.KnownFloatingPointNormalized"></a>spark.rapids.sql.expression.KnownFloatingPointNormalized| |Tag to prevent redundant normalization|true|None|
<a name="sql.expression.KnownNotNull"></a>spark.rapids.sql.expression.KnownNotNull| |Tag an expression as known to not be null|true|None|
<a name="sql.expression.Lag"></a>spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None|
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8222,7 +8222,7 @@ are limited.
<td rowSpan="3">JsonTuple</td>
<td rowSpan="3">`json_tuple`</td>
<td rowSpan="3">Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.</td>
<td rowSpan="3">This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.</td>
<td rowSpan="3">This is disabled by default because Experimental feature that could be unstable or have performance issues.</td>
<td rowSpan="3">project</td>
<td>json</td>
<td> </td>
Expand Down
21 changes: 8 additions & 13 deletions integration_tests/src/main/python/json_matrix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def test_get_json_object_allow_comments_off(std_input_path):

# Off is the default so it really needs to work
@allow_non_gpu(TEXT_INPUT_EXEC)
@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10454')
def test_json_tuple_allow_comments_off(std_input_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_COMMENTS_FILE, "json").selectExpr('''json_tuple(json, "str")'''),
Expand Down Expand Up @@ -231,7 +230,6 @@ def test_get_json_object_allow_unquoted_field_names_off(std_input_path):

# Off is the default so it really needs to work
@allow_non_gpu(TEXT_INPUT_EXEC)
@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10454')
def test_json_tuple_allow_unquoted_field_names_off(std_input_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_UNQUOTE_FIELD_NAMES_FILE, "json").selectExpr('''json_tuple(json, "str")'''),
Expand Down Expand Up @@ -292,7 +290,6 @@ def test_get_json_object_allow_numeric_leading_zeros_off(std_input_path):

# Off is the default so it really needs to work
@allow_non_gpu(TEXT_INPUT_EXEC)
@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10454')
def test_json_tuple_allow_numeric_leading_zeros_off(std_input_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_NUMERIC_LEAD_ZEROS_FILE, "json").selectExpr('''json_tuple(json, "byte", "int", "float", "decimal")'''),
Expand Down Expand Up @@ -355,7 +352,6 @@ def test_get_json_object_allow_nonnumeric_numbers_off(std_input_path):

# Off is the default for json_tuple, so we want this to work
@allow_non_gpu(TEXT_INPUT_EXEC)
@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10454')
def test_json_tuple_allow_nonnumeric_numbers_off(std_input_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_NONNUMERIC_NUMBERS_FILE, "json").selectExpr('''json_tuple(json, "float", "double")'''),
Expand Down Expand Up @@ -411,7 +407,6 @@ def test_get_json_object_allow_backslash_escape_any_off(std_input_path):

# Off is the default for json_tuple, so we want this to work
@allow_non_gpu(TEXT_INPUT_EXEC)
@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10454')
def test_json_tuple_allow_backslash_escape_any_off(std_input_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_BS_ESC_FILE, "json").selectExpr('''json_tuple(json, "str")'''),
Expand Down Expand Up @@ -782,19 +777,19 @@ def test_get_json_object_formats(std_input_path, input_file):
conf=_enable_get_json_object_conf)

@pytest.mark.parametrize('input_file', [
pytest.param("int_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')),
pytest.param("float_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')),
pytest.param("sci_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')),
"int_formatted.json",
"float_formatted.json",
"sci_formatted.json",
"int_formatted_strings.json",
"float_formatted_strings.json",
"sci_formatted_strings.json",
"decimal_locale_formatted_strings.json",
"single_quoted_strings.json",
pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')),
pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')),
pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')),
pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218')),
pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'))])
"boolean_formatted.json",
"invalid_ridealong_columns.json",
"int_array_formatted.json",
"int_struct_formatted.json",
"int_mixed_array_struct_formatted.json"])
@allow_non_gpu(TEXT_INPUT_EXEC)
def test_json_tuple_formats(std_input_path, input_file):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
31 changes: 18 additions & 13 deletions integration_tests/src/main/python/json_tuple_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ def test_json_tuple(json_str_pattern):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen, length=10).selectExpr(
'json_tuple(a, "a", "email", "owner", "b", "b$", "b$$")'),
conf={'spark.sql.parser.escapedStringLiterals': 'true',
'spark.rapids.sql.expression.JsonTuple': 'true'})
conf={'spark.sql.parser.escapedStringLiterals': 'true'})

def test_json_tuple_select_non_generator_col():
gen = StringGen(pattern="{\"Zipcode\":\"abc\",\"ZipCodeType\":\"STANDARD\",\"City\":\"PARC PARQUE\",\"State\":\"PR\"}")
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, [('a', gen)]),
'table',
'select a, json_tuple(a, \"Zipcode\", \"ZipCodeType\", \"City\", \"State\") from table',
conf={'spark.sql.parser.escapedStringLiterals': 'true',
'spark.rapids.sql.expression.JsonTuple': 'true'})
conf={'spark.sql.parser.escapedStringLiterals': 'true'})

@allow_non_gpu('GenerateExec', 'JsonTuple')
@pytest.mark.parametrize('json_str_pattern', json_str_patterns, ids=idfn)
Expand All @@ -56,18 +54,25 @@ def test_json_tuple_with_large_number_of_fields_fallback(json_str_pattern):
"location", "city", "country", "zip", "code", "region", "state", "street", "block", "loc", \
"height", "h", "author", "title", "price", "isbn", "book", "rating", "score", "popular")'),
"JsonTuple",
conf={'spark.sql.parser.escapedStringLiterals': 'true',
'spark.rapids.sql.expression.JsonTuple': 'true'})

@allow_non_gpu('GenerateExec', 'JsonTuple')
conf={'spark.sql.parser.escapedStringLiterals': 'true'})

@pytest.mark.parametrize('json_str_pattern', json_str_patterns, ids=idfn)
def test_json_tuple_with_special_characters_fallback(json_str_pattern):
def test_json_tuple_with_special_characters(json_str_pattern):
gen = mk_json_str_gen(json_str_pattern)
special_characters = ['.', '[', ']', '{', '}', '\\\\', '\'', '\\\"']
for special_character in special_characters:
assert_gpu_fallback_collect(
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen, length=10).selectExpr(
'json_tuple(a, "a", "a' + special_character + '")'),
"JsonTuple",
conf={'spark.sql.parser.escapedStringLiterals': 'true',
'spark.rapids.sql.expression.JsonTuple': 'true'})
conf={'spark.sql.parser.escapedStringLiterals': 'true'})

def test_json_tuple_with_slash_backslash():
schema = StructType([StructField("jsonStr", StringType())])
data = [['{"url":"https:\/\/www.nvidia.com\/1\/pic\/-1234.jpg","item":[],"info":{"id":12345}}'],
['{"info":[{"foo":0}],"from":"bar","url":[{"title":"测试\\\\\测试 测试","value_3":"测试;测试;测试"}]}'],
['{"number":"1234567890","info":[{"foo":0}],"from":"bar"}']]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data, schema).selectExpr(
'json_tuple(jsonStr, "url", "info")'),
conf={'spark.sql.parser.escapedStringLiterals': 'true'})
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{GetJsonObjectOptions,Scalar}
import com.nvidia.spark.rapids.Arm._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry}
import com.nvidia.spark.rapids.jni.JSONUtils
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
Expand Down Expand Up @@ -59,26 +59,28 @@ case class GpuJsonTuple(children: Seq[Expression]) extends GpuGenerator
val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase
val schema = Array.fill[DataType](fieldExpressions.length)(StringType)

val fieldScalars = fieldExpressions.safeMap { field =>
val fieldInstructions = fieldExpressions.map { field =>
withResourceIfAllowed(field.columnarEvalAny(inputBatch)) {
case fieldScalar: GpuScalar =>
// Specials characters like '.', '[', ']' are not supported in field names
Scalar.fromString("$." + fieldScalar.getBase.getJavaString)
val fieldString = fieldScalar.getBase.getJavaString
val key = new JSONUtils.PathInstructionJni(
JSONUtils.PathInstructionType.KEY, "", -1)
val named = new JSONUtils.PathInstructionJni(
JSONUtils.PathInstructionType.NAMED, fieldString, -1)
Array(key, named)
case _ => throw new UnsupportedOperationException(s"JSON field must be a scalar value")
}
}

withResource(fieldScalars) { fieldScalars =>
withResource(fieldScalars.safeMap(field => json.getJSONObject(field,
GetJsonObjectOptions.builder().allowSingleQuotes(true).build()))) { resultCols =>
val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap {
case (col, dataType) => GpuColumnVector.from(col, dataType)
}
val nonGeneratorCols = (0 until generatorOffset).safeMap { i =>
inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount
}
new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows)
withResource(fieldInstructions.safeMap(field => JSONUtils.getJsonObject(json, field))) {
resultCols =>
val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap {
case (col, dataType) => GpuColumnVector.from(col, dataType)
}
val nonGeneratorCols = (0 until generatorOffset).safeMap { i =>
inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount
}
new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3730,24 +3730,10 @@ object GpuOverrides extends Logging {
// potential performance problems.
willNotWorkOnGpu("JsonTuple with large number of fields is not supported on GPU")
}
// If any field argument contains special characters as follows, fall back to CPU.
(a.children.tail).map { fieldExpr =>
extractLit(fieldExpr).foreach { field =>
if (field.value != null) {
val fieldStr = field.value.asInstanceOf[UTF8String].toString
val specialCharacters = List(".", "[", "]", "{", "}", "\\", "\'", "\"")
if (specialCharacters.exists(fieldStr.contains(_))) {
willNotWorkOnGpu(s"""JsonTuple with special character in field \"$fieldStr\" """
+ "is not supported on GPU")
}
}
}
}
}
override def convertToGpu(): GpuExpression = GpuJsonTuple(childExprs.map(_.convertToGpu()))
}
).disabledByDefault("JsonTuple on the GPU does not support all of the normalization " +
"that the CPU supports."),
).disabledByDefault("Experimental feature that could be unstable or have performance issues."),
expr[org.apache.spark.sql.execution.ScalarSubquery](
"Subquery that will return only one row and one column",
ExprChecks.projectOnly(
Expand Down
6 changes: 3 additions & 3 deletions tools/generated_files/311/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because it is currently in beta and undergoes continuous enhancements. Please consult the [compatibility documentation](../compatibility.md#json-supporting-types) to determine whether you can enable this configuration for your use case,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because it is currently in beta and undergoes continuous enhancements. Please consult the [compatibility documentation](../compatibility.md#json-supporting-types) to determine whether you can enable this configuration for your use case,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because Experimental feature that could be unstable or have performance issues.,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because Experimental feature that could be unstable or have performance issues.,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because Experimental feature that could be unstable or have performance issues.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
KnownFloatingPointNormalized,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownFloatingPointNormalized,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownNotNull,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,S,PS,PS,PS,NS
Expand Down
6 changes: 3 additions & 3 deletions tools/generated_files/312/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because it is currently in beta and undergoes continuous enhancements. Please consult the [compatibility documentation](../compatibility.md#json-supporting-types) to determine whether you can enable this configuration for your use case,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because it is currently in beta and undergoes continuous enhancements. Please consult the [compatibility documentation](../compatibility.md#json-supporting-types) to determine whether you can enable this configuration for your use case,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because JsonTuple on the GPU does not support all of the normalization that the CPU supports.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because Experimental feature that could be unstable or have performance issues.,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because Experimental feature that could be unstable or have performance issues.,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,NS,`json_tuple`,This is disabled by default because Experimental feature that could be unstable or have performance issues.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
KnownFloatingPointNormalized,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownFloatingPointNormalized,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownNotNull,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,S,PS,PS,PS,NS
Expand Down
Loading
Loading