From b9c292cd20467f3cc94dc28ed4f512425446b2ca Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Thu, 29 Feb 2024 01:29:33 +0800 Subject: [PATCH] Use parser from spark to normalize json path in GetJsonObject (#10466) * remove leading space for json path in GetJsonObject Signed-off-by: Haoyang Li * Update comments Signed-off-by: Haoyang Li * Use JsonPathParser to normalize path Signed-off-by: Haoyang Li * Update compatibility doc Signed-off-by: Haoyang Li * clean up Signed-off-by: Haoyang Li * Fallback json paths containing in GetJsonObject Signed-off-by: Haoyang Li * cache normalizeJsonPath and prevent memory leak Signed-off-by: Haoyang Li * clean up Signed-off-by: Haoyang Li * ready to merge Signed-off-by: Haoyang Li * Use parser to check whether to fallback Signed-off-by: Haoyang Li * Add a special case Signed-off-by: Haoyang Li --------- Signed-off-by: Haoyang Li --- docs/compatibility.md | 10 -- .../src/main/python/get_json_test.py | 54 ++++--- .../spark/rapids/GpuGetJsonObject.scala | 137 +++++++++++++++++- .../nvidia/spark/rapids/GpuOverrides.scala | 5 +- 4 files changed, 169 insertions(+), 37 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 925e9c1439b..adcbc9e5cc9 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -479,16 +479,6 @@ The following is a list of known differences. The following is a list of bugs in either the GPU version or arguably in Apache Spark itself. * https://github.com/NVIDIA/spark-rapids/issues/10219 non-matching quotes in quoted strings - * https://github.com/NVIDIA/spark-rapids/issues/10213 array index notation works without root - * https://github.com/NVIDIA/spark-rapids/issues/10214 unquoted array index notation is not - supported - * https://github.com/NVIDIA/spark-rapids/issues/10215 leading spaces can be stripped from named - keys. - * https://github.com/NVIDIA/spark-rapids/issues/10216 It appears that Spark is flattening some - output, which is different from other implementations including the GPU version. - * https://github.com/NVIDIA/spark-rapids/issues/10217 a JSON path execution bug - * https://issues.apache.org/jira/browse/SPARK-46761 Apache Spark does not allow the `?` character in - a quoted JSON path string. ## Avro diff --git a/integration_tests/src/main/python/get_json_test.py b/integration_tests/src/main/python/get_json_test.py index 6916d4929f4..935a61e0562 100644 --- a/integration_tests/src/main/python/get_json_test.py +++ b/integration_tests/src/main/python/get_json_test.py @@ -73,29 +73,19 @@ def test_get_json_object_single_quotes(): "$['key with spaces']", "$.store.book", "$.store.book[0]", - "$.store.book[*]", pytest.param("$",marks=[ pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'), pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196'), pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10194')]), "$.store.book[0].category", - "$.store.book[*].category", - "$.store.book[*].isbn", - pytest.param("$.store.book[*].reader",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10216')), "$.store.basket[0][1]", - "$.store.basket[*]", - "$.store.basket[*][0]", - "$.store.basket[0][*]", - "$.store.basket[*][*]", "$.store.basket[0][2].b", - pytest.param("$.store.basket[0][*].b",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10217')), "$.zip code", "$.fb:testid", pytest.param("$.a",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196')), "$.non_exist_key", - pytest.param("$..no_recursive", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10212')), - "$.store.book[0].non_exist_key", - "$.store.basket[*].non_exist_key"]) + "$..no_recursive", + "$.store.book[0].non_exist_key"]) def test_get_json_object_spark_unit_tests(query): schema = StructType([StructField("jsonStr", StringType())]) data = [ @@ -110,6 +100,26 @@ def test_get_json_object_spark_unit_tests(query): f.get_json_object('jsonStr', query)), conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) +@allow_non_gpu("ProjectExec", "GetJsonObject") +@pytest.mark.parametrize('query',["$.store.basket[0][*].b", + "$.store.book[*].reader", + "$.store.book[*]", + "$.store.book[*].category", + "$.store.book[*].isbn", + "$.store.basket[*]", + "$.store.basket[*][0]", + "$.store.basket[0][*]", + "$.store.basket[*][*]", + "$.store.basket[*].non_exist_key"]) +def test_get_json_object_spark_unit_tests_fallback(query): + schema = StructType([StructField("jsonStr", StringType())]) + data = [['''{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],"basket":[[1,2,{"b":"y","a":"x"}],[3,4],[5,6]],"book":[{"author":"Nigel Rees","title":"Sayings of the Century","category":"reference","price":8.95},{"author":"Herman Melville","title":"Moby Dick","category":"fiction","price":8.99,"isbn":"0-553-21311-3"},{"author":"J. R. R. Tolkien","title":"The Lord of the Rings","category":"fiction","reader":[{"age":25,"name":"bob"},{"age":26,"name":"jack"}],"price":22.99,"isbn":"0-395-19395-8"}],"bicycle":{"price":19.95,"color":"red"}},"email":"amy@only_for_json_udf_test.net","owner":"amy","zip code":"94025","fb:testid":"1234"}''']] + assert_gpu_fallback_collect( + lambda spark: spark.createDataFrame(data,schema=schema).select( + f.get_json_object('jsonStr', query)), + "GetJsonObject", + conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) + @pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10218") def test_get_json_object_normalize_non_string_output(): schema = StructType([StructField("jsonStr", StringType())]) @@ -132,7 +142,6 @@ def test_get_json_object_normalize_non_string_output(): f.get_json_object('jsonStr', '$')), conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) -@pytest.mark.xfail(reason="https://issues.apache.org/jira/browse/SPARK-46761") def test_get_json_object_quoted_question(): schema = StructType([StructField("jsonStr", StringType())]) data = [[r'{"?":"QUESTION"}']] @@ -221,7 +230,6 @@ def test_get_json_object_invalid_path(): ), conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) -@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10213") def test_get_json_object_top_level_array_notation(): # This is a special version of invalid path. It is something that the GPU supports # but the CPU thinks is invalid @@ -239,7 +247,6 @@ def test_get_json_object_top_level_array_notation(): ), conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) -@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10214") def test_get_json_object_unquoted_array_notation(): # This is a special version of invalid path. It is something that the GPU supports # but the CPU thinks is invalid @@ -257,7 +264,6 @@ def test_get_json_object_unquoted_array_notation(): conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) -@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10215") def test_get_json_object_white_space_removal(): # This is a special version of invalid path. It is something that the GPU supports # but the CPU thinks is invalid @@ -265,19 +271,33 @@ def test_get_json_object_white_space_removal(): data = [['{" a":" A"," b":" B"}'], ['{"a":"A","b":"B"}'], ['{"a ":"A ","b ":"B "}'], - ['{" a ":" A "," b ":" B "}']] + ['{" a ":" A "," b ":" B "}'], + ['{" a ": {" a ":" A "}," b ": " B "}'], + ['{" a":"b","a.a":"c","b":{"a":"ab"}}'], + ['{" a":"b"," a. a":"c","b":{"a":"ab"}}'], + ['{" a":"b","a .a ":"c","b":{"a":"ab"}}'], + ['{" a":"b"," a . a ":"c","b":{"a":"ab"}}'] + ] assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.createDataFrame(data,schema=schema).select( f.col('jsonStr'), f.get_json_object('jsonStr', '$.a').alias('dot_a'), f.get_json_object('jsonStr', '$. a').alias('dot_space_a'), + f.get_json_object('jsonStr', '$.\ta').alias('dot_tab_a'), + f.get_json_object('jsonStr', '$. a').alias('dot_spaces_a3'), f.get_json_object('jsonStr', '$.a ').alias('dot_a_space'), f.get_json_object('jsonStr', '$. a ').alias('dot_space_a_space'), f.get_json_object('jsonStr', "$['b']").alias('dot_b'), f.get_json_object('jsonStr', "$[' b']").alias('dot_space_b'), f.get_json_object('jsonStr', "$['b ']").alias('dot_b_space'), f.get_json_object('jsonStr', "$[' b ']").alias('dot_space_b_space'), + f.get_json_object('jsonStr', "$. a. a").alias('dot_space_a_dot_space_a'), + f.get_json_object('jsonStr', "$.a .a ").alias('dot_a_space_dot_a_space'), + f.get_json_object('jsonStr', "$. a . a ").alias('dot_space_a_space_dot_space_a_space'), + f.get_json_object('jsonStr', "$[' a. a']").alias('space_a_dot_space_a'), + f.get_json_object('jsonStr', "$['a .a ']").alias('a_space_dot_a_space'), + f.get_json_object('jsonStr', "$[' a . a ']").alias('space_a_space_dot_space_a_space'), ), conf={'spark.rapids.sql.expression.GetJsonObject': 'true'}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala index e15d8f90d74..16950368ab0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,115 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{ColumnVector,GetJsonObjectOptions} +import scala.util.parsing.combinator.RegexParsers + +import ai.rapids.cudf.{ColumnVector, GetJsonObjectOptions, Scalar} import com.nvidia.spark.rapids.Arm.withResource -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, GetJsonObject} import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.unsafe.types.UTF8String + +// Copied from Apache Spark org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +private[this] sealed trait PathInstruction +private[this] object PathInstruction { + case object Subscript extends PathInstruction + case object Wildcard extends PathInstruction + case object Key extends PathInstruction + case class Index(index: Long) extends PathInstruction + case class Named(name: String) extends PathInstruction +} + +private[this] object JsonPathParser extends RegexParsers { + import PathInstruction._ + + def root: Parser[Char] = '$' + + def long: Parser[Long] = "\\d+".r ^? { + case x => x.toLong + } + + // parse `[*]` and `[123]` subscripts + def subscript: Parser[List[PathInstruction]] = + for { + operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' + } yield { + Subscript :: operand :: Nil + } + + // parse `.name` or `['name']` child expressions + def named: Parser[List[PathInstruction]] = + for { + name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" + } yield { + Key :: Named(name) :: Nil + } + + // child wildcards: `..`, `.*` or `['*']` + def wildcard: Parser[List[PathInstruction]] = + (".*" | "['*']") ^^^ List(Wildcard) + + def node: Parser[List[PathInstruction]] = + wildcard | + named | + subscript + + val expression: Parser[List[PathInstruction]] = { + phrase(root ~> rep(node) ^^ (x => x.flatten)) + } + + def parse(str: String): Option[List[PathInstruction]] = { + this.parseAll(expression, str) match { + case Success(result, _) => + Some(result) + + case _ => + None + } + } + + def containsUnsupportedPath(instructions: List[PathInstruction]): Boolean = { + // Gpu GetJsonObject is not supported if JSON path contains wildcard [*] + // see https://github.com/NVIDIA/spark-rapids/issues/10216 + instructions.exists { + case Wildcard => true + case Named(name) if name == "*" => true + case _ => false + } + } + + def normalize(instructions: List[PathInstruction]): String = { + // convert List[PathInstruction] to String + "$" + instructions.map { + case Subscript | Key => "" + case Wildcard => "[*]" + case Index(index) => s"[$index]" + case Named(name) => s"['$name']" + case _ => throw new IllegalArgumentException(s"Invalid instruction in path") + }.mkString + } +} + +class GpuGetJsonObjectMeta( + expr: GetJsonObject, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule + ) extends BinaryExprMeta[GetJsonObject](expr, conf, parent, rule) { + + override def tagExprForGpu(): Unit = { + val lit = GpuOverrides.extractLit(expr.right) + lit.map { l => + val instructions = JsonPathParser.parse(l.value.asInstanceOf[UTF8String].toString) + if (instructions.exists(JsonPathParser.containsUnsupportedPath)) { + willNotWorkOnGpu("get_json_object on GPU does not support wildcard [*] in path") + } + } + } + + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuGetJsonObject(lhs, rhs) +} case class GpuGetJsonObject(json: Expression, path: Expression) extends GpuBinaryExpressionArgsAnyScalar @@ -32,9 +136,30 @@ case class GpuGetJsonObject(json: Expression, path: Expression) override def nullable: Boolean = true override def prettyName: String = "get_json_object" - override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { - lhs.getBase().getJSONObject(rhs.getBase, - GetJsonObjectOptions.builder().allowSingleQuotes(true).build()); + private var cachedNormalizedPath: Option[Option[String]] = None + + def normalizeJsonPath(path: GpuScalar): Option[String] = { + if (path.isValid) { + val pathStr = path.getValue.toString() + JsonPathParser.parse(pathStr).map(JsonPathParser.normalize) + } else { + None + } + } + + override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { + cachedNormalizedPath.getOrElse { + val normalizedPath: Option[String] = normalizeJsonPath(rhs) + cachedNormalizedPath = Some(normalizedPath) + normalizedPath + } match { + case Some(normalizedStr) => + withResource(Scalar.fromString(normalizedStr)) { scalar => + lhs.getBase().getJSONObject(scalar, + GetJsonObjectOptions.builder().allowSingleQuotes(true).build()) + } + case None => GpuColumnVector.columnVectorFromNull(lhs.getRowCount.toInt, StringType) + } } override def doColumnar(numRows: Int, lhs: GpuScalar, rhs: GpuScalar): ColumnVector = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 668b438183a..a35cad3b7c5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3646,10 +3646,7 @@ object GpuOverrides extends Logging { ExprChecks.projectOnly( TypeSig.STRING, TypeSig.STRING, Seq(ParamCheck("json", TypeSig.STRING, TypeSig.STRING), ParamCheck("path", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING))), - (a, conf, p, r) => new BinaryExprMeta[GetJsonObject](a, conf, p, r) { - override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuGetJsonObject(lhs, rhs) - } + (a, conf, p, r) => new GpuGetJsonObjectMeta(a, conf, p, r) ).disabledByDefault("escape sequences are not processed correctly, the input is not " + "validated, and the output is not normalized the same as Spark"), expr[JsonToStructs](