Skip to content

Commit

Permalink
[SPARK-48148][CORE] JSON objects should not be modified when read as …
Browse files Browse the repository at this point in the history
…STRING

### What changes were proposed in this pull request?

Currently, when reading a JSON like this:
```
{"a": {"b": -999.99999999999999999999999999999999995}}
```

With the schema:
```
a STRING
```

Spark will yield a result like this:
```
{"b": -1000.0}
```

Other changes such as changes to the input string's whitespace may also occur. In some cases, we apply scientific notation to an input floating-point number when reading it as STRING.

This applies to reading JSON files (as with `spark.read.json`) as well as the SQL expression `from_json`.

### Why are the changes needed?

Correctness issues may occur if a field is read as a STRING and then later parsed (e.g. with `from_json`) after the contents have been modified.

### Does this PR introduce _any_ user-facing change?
Yes, when reading non-string fields from a JSON object using the STRING type, we will now extract the field exactly as it appears.

### How was this patch tested?
Added a test in `JsonSuite.scala`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46408 from eric-maynard/SPARK-48148.

Lead-authored-by: Eric Maynard <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
2 people authored and JacobZheng0927 committed May 11, 2024
1 parent e7b9364 commit 15c7cc5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import com.fasterxml.jackson.core._
import org.apache.hadoop.fs.PositionedReadable

import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -275,19 +276,63 @@ class JacksonParser(
}
}

case _: StringType =>
(parser: JsonParser) => parseJsonToken[UTF8String](parser, dataType) {
case _: StringType => (parser: JsonParser) => {
// This must be enabled if we will retrieve the bytes directly from the raw content:
val includeSourceInLocation = JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION
val originalMask = if (includeSourceInLocation.enabledIn(parser.getFeatureMask)) {
1
} else {
0
}
parser.overrideStdFeatures(includeSourceInLocation.getMask, includeSourceInLocation.getMask)
val result = parseJsonToken[UTF8String](parser, dataType) {
case VALUE_STRING =>
UTF8String.fromString(parser.getText)

case _ =>
case other =>
// Note that it always tries to convert the data as string without the case of failure.
val writer = new ByteArrayOutputStream()
Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
generator => generator.copyCurrentStructure(parser)
val startLocation = parser.currentTokenLocation()
def skipAhead(): Unit = {
other match {
case START_OBJECT =>
parser.skipChildren()
case START_ARRAY =>
parser.skipChildren()
case _ =>
// Do nothing in this case; we've already read the token
}
}
UTF8String.fromBytes(writer.toByteArray)
}

// PositionedReadable
startLocation.contentReference().getRawContent match {
case byteArray: Array[Byte] if exactStringParsing =>
skipAhead()
val endLocation = parser.currentLocation.getByteOffset

UTF8String.fromBytes(
byteArray,
startLocation.getByteOffset.toInt,
endLocation.toInt - (startLocation.getByteOffset.toInt))
case positionedReadable: PositionedReadable if exactStringParsing =>
skipAhead()
val endLocation = parser.currentLocation.getByteOffset

val size = endLocation.toInt - (startLocation.getByteOffset.toInt)
val buffer = new Array[Byte](size)
positionedReadable.read(startLocation.getByteOffset, buffer, 0, size)
UTF8String.fromBytes(buffer, 0, size)
case _ =>
val writer = new ByteArrayOutputStream()
Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
generator => generator.copyCurrentStructure(parser)
}
UTF8String.fromBytes(writer.toByteArray)
}
}
// Reset back to the original configuration:
parser.overrideStdFeatures(includeSourceInLocation.getMask, originalMask)
result
}

case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
Expand Down Expand Up @@ -429,6 +474,8 @@ class JacksonParser(

private val allowEmptyString = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON)

private val exactStringParsing = SQLConf.get.getConf(SQLConf.JSON_EXACT_STRING_PARSING)

/**
* This function throws an exception for failed conversion. For empty string on data types
* except for string and binary types, this also throws an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4257,6 +4257,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val JSON_EXACT_STRING_PARSING =
buildConf("spark.sql.json.enableExactStringParsing")
.internal()
.doc("When set to true, string columns extracted from JSON objects will be extracted " +
"exactly as they appear in the input string, with no changes")
.version("4.0.0")
.booleanConf
.createWithDefault(true)

val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK =
buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3865,6 +3865,64 @@ abstract class JsonSuite
}
}
}

test("SPARK-48148: values are unchanged when read as string") {
withTempPath { path =>
def extractData(
jsonString: String,
expectedInexactData: Seq[String],
expectedExactData: Seq[String],
multiLine: Boolean = false): Unit = {
Seq(jsonString).toDF()
.repartition(1)
.write
.mode("overwrite")
.text(path.getAbsolutePath)

withClue("Exact string parsing") {
withSQLConf(SQLConf.JSON_EXACT_STRING_PARSING.key -> "true") {
val df = spark.read
.schema("data STRING")
.option("multiLine", multiLine.toString)
.json(path.getAbsolutePath)
checkAnswer(df, expectedExactData.map(d => Row(d)))
}
}

withClue("Inexact string parsing") {
withSQLConf(SQLConf.JSON_EXACT_STRING_PARSING.key -> "false") {
val df = spark.read
.schema("data STRING")
.option("multiLine", multiLine.toString)
.json(path.getAbsolutePath)
checkAnswer(df, expectedInexactData.map(d => Row(d)))
}
}
}
extractData(
"""{"data": {"white": "space"}}""",
expectedInexactData = Seq("""{"white":"space"}"""),
expectedExactData = Seq("""{"white": "space"}""")
)
extractData(
"""{"data": ["white", "space"]}""",
expectedInexactData = Seq("""["white","space"]"""),
expectedExactData = Seq("""["white", "space"]""")
)
val granularFloat = "-999.99999999999999999999999999999999995"
extractData(
s"""{"data": {"v": ${granularFloat}}}""",
expectedInexactData = Seq("""{"v":-1000.0}"""),
expectedExactData = Seq(s"""{"v": ${granularFloat}}""")
)
extractData(
s"""{"data": {"white":\n"space"}}""",
expectedInexactData = Seq("""{"white":"space"}"""),
expectedExactData = Seq(s"""{"white":\n"space"}"""),
multiLine = true
)
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down

0 comments on commit 15c7cc5

Please sign in to comment.