From 465818389aab1217c9de5c685cfaee3ffaec91bb Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Mar 2017 09:52:37 -0700 Subject: [PATCH] [SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related comments ## What changes were proposed in this pull request? This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix. Also, this PR modifies some tests related parse modes. ## How was this patch tested? Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`. Author: hyukjinkwon Closes #17377 from HyukjinKwon/SPARK-19949. --- python/pyspark/sql/readwriter.py | 6 +- python/pyspark/sql/streaming.py | 2 + .../expressions/jsonExpressions.scala | 4 +- .../spark/sql/catalyst/json/JSONOptions.scala | 12 +- .../sql/catalyst/util/FailureSafeParser.scala | 15 ++- .../spark/sql/catalyst/util/ParseMode.scala | 56 +++++++++ .../spark/sql/catalyst/util/ParseModes.scala | 41 ------- .../expressions/JsonExpressionsSuite.scala | 4 +- .../apache/spark/sql/DataFrameReader.scala | 4 +- .../datasources/csv/CSVOptions.scala | 14 +-- .../datasources/json/JsonInferSchema.scala | 3 +- .../sql/streaming/DataStreamReader.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 7 +- .../datasources/json/JsonSuite.scala | 113 +++++++----------- 14 files changed, 130 insertions(+), 153 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 122e17f2020f4..759c27507c397 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -369,10 +369,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``-1`` meaning unlimited length. - :param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will - log for each partition. Malformed records beyond this - number will be ignored. If None is set, it - uses the default value, ``10``. + :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0. + If specified, it is ignored. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 288cc1e4f64dc..e227f9ceb5769 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -625,6 +625,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``-1`` meaning unlimited length. + :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0. + If specified, it is ignored. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 08af5522d822d..df4d406b84d60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -548,7 +548,7 @@ case class JsonToStructs( lazy val parser = new JacksonParser( rowSchema, - new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) + new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) override def dataType: DataType = schema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 355c26afa6f0d..c22b1ade4e64b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -65,7 +65,8 @@ private[sql] class JSONOptions( val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val parseMode: ParseMode = + parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) @@ -82,15 +83,6 @@ private[sql] class JSONOptions( val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false) - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { - logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - val failFast = ParseModes.isFailFastMode(parseMode) - val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - val permissive = ParseModes.isPermissiveMode(parseMode) - /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index e8da10d65ecb9..725e3015b3416 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], - mode: String, + mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String) { @@ -58,11 +58,14 @@ class FailureSafeParser[IN]( try { rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } catch { - case e: BadRecordException if ParseModes.isPermissiveMode(mode) => - Iterator(toResultRow(e.partialResult(), e.record)) - case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => - Iterator.empty - case e: BadRecordException => throw e.cause + case e: BadRecordException => mode match { + case PermissiveMode => + Iterator(toResultRow(e.partialResult(), e.record)) + case DropMalformedMode => + Iterator.empty + case FailFastMode => + throw e.cause + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala new file mode 100644 index 0000000000000..4565dbde88c88 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.internal.Logging + +sealed trait ParseMode { + /** + * String name of the parse mode. + */ + def name: String +} + +/** + * This mode permissively parses the records. + */ +case object PermissiveMode extends ParseMode { val name = "PERMISSIVE" } + +/** + * This mode ignores the whole corrupted records. + */ +case object DropMalformedMode extends ParseMode { val name = "DROPMALFORMED" } + +/** + * This mode throws an exception when it meets corrupted records. + */ +case object FailFastMode extends ParseMode { val name = "FAILFAST" } + +object ParseMode extends Logging { + /** + * Returns the parse mode from the given string. + */ + def fromString(mode: String): ParseMode = mode.toUpperCase match { + case PermissiveMode.name => PermissiveMode + case DropMalformedMode.name => DropMalformedMode + case FailFastMode.name => FailFastMode + case _ => + logWarning(s"$mode is not a valid parse mode. Using ${PermissiveMode.name}.") + PermissiveMode + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala deleted file mode 100644 index 0e466962b4678..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.util - -object ParseModes { - val PERMISSIVE_MODE = "PERMISSIVE" - val DROP_MALFORMED_MODE = "DROPMALFORMED" - val FAIL_FAST_MODE = "FAILFAST" - - val DEFAULT = PERMISSIVE_MODE - - def isValidMode(mode: String): Boolean = { - mode.toUpperCase match { - case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true - case _ => false - } - } - - def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE - def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE - def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { - mode.toUpperCase == PERMISSIVE_MODE - } else { - true // We default to permissive is the mode string is not valid - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index e4698d44636b6..c5b72235e5db0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,7 +21,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -367,7 +367,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { // Other modes should still return `null`. checkEvaluation( - JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), + JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), null ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 767a636d70731..e39b4d91f1f6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -510,10 +510,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have. *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
  • - *
  • `maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows - * Spark will log for each partition. Malformed records beyond this number will be ignored.
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing. + * during parsing. It supports the following case-insensitive modes. *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index f6c6b6f56cd9d..5d2c23ed9618c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -82,7 +82,8 @@ class CSVOptions( val delimiter = CSVUtils.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val parseMode: ParseMode = + parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) val charset = parameters.getOrElse("encoding", parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) @@ -95,15 +96,6 @@ class CSVOptions( val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace") val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace") - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { - logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - val failFast = ParseModes.isFailFastMode(parseMode) - val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - val permissive = ParseModes.isPermissiveMode(parseMode) - val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) @@ -139,8 +131,6 @@ class CSVOptions( val escapeQuotes = getBool("escapeQuotes", true) - val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10) - val quoteAll = getBool("quoteAll", false) val inputBufferSize = 128 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala index 7475f8ec79331..e15c30b4374bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.json.JSONOptions +import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -40,7 +41,7 @@ private[sql] object JsonInferSchema { json: RDD[T], configOptions: JSONOptions, createParser: (JsonFactory, T) => JsonParser): StructType = { - val shouldHandleCorruptRecord = configOptions.permissive + val shouldHandleCorruptRecord = configOptions.parseMode == PermissiveMode val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord // perform schema inference on each row and merge afterwards diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 388ef182ce3a6..f6e2fef74b8db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -260,7 +260,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
    • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
    • *
    • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing. + * during parsing. It supports the following case-insensitive modes. *
        *
      • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 598babfe0e7ad..2600894ca303c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -992,9 +992,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { Seq(false, true).foreach { wholeFile => val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + // We use `PERMISSIVE` mode by default if invalid string is given. val df1 = spark .read - .option("mode", "PERMISSIVE") + .option("mode", "abcd") .option("wholeFile", wholeFile) .schema(schema) .csv(testFile(valueMalformedFile)) @@ -1008,7 +1009,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) val df2 = spark .read - .option("mode", "PERMISSIVE") + .option("mode", "Permissive") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .option("wholeFile", wholeFile) .schema(schemaWithCorrField1) @@ -1025,7 +1026,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .add("b", TimestampType) val df3 = spark .read - .option("mode", "PERMISSIVE") + .option("mode", "permissive") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .option("wholeFile", wholeFile) .schema(schemaWithCorrField2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 56fcf773f7dd9..b09cef76d2be7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1083,83 +1083,59 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") { - withTempView("jsonTable") { - val schema = StructType( - StructField("a", StringType, true) :: - StructField("b", StringType, true) :: - StructField("c", StringType, true) :: Nil) + val schema = StructType( + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) - val jsonDF = spark.read.schema(schema).json(corruptRecords) - jsonDF.createOrReplaceTempView("jsonTable") + val jsonDF = spark.read.schema(schema).json(corruptRecords) - checkAnswer( - sql( - """ - |SELECT a, b, c - |FROM jsonTable - """.stripMargin), - Seq( - // Corrupted records are replaced with null - Row(null, null, null), - Row(null, null, null), - Row(null, null, null), - Row("str_a_4", "str_b_4", "str_c_4"), - Row(null, null, null)) - ) - } + checkAnswer( + jsonDF.select($"a", $"b", $"c"), + Seq( + // Corrupted records are replaced with null + Row(null, null, null), + Row(null, null, null), + Row(null, null, null), + Row("str_a_4", "str_b_4", "str_c_4"), + Row(null, null, null)) + ) } test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { - withTempView("jsonTable") { - val jsonDF = spark.read.json(corruptRecords) - jsonDF.createOrReplaceTempView("jsonTable") - val schema = StructType( - StructField("_unparsed", StringType, true) :: + val jsonDF = spark.read.json(corruptRecords) + val schema = StructType( + StructField("_unparsed", StringType, true) :: StructField("a", StringType, true) :: StructField("b", StringType, true) :: StructField("c", StringType, true) :: Nil) - assert(schema === jsonDF.schema) - - // In HiveContext, backticks should be used to access columns starting with a underscore. - checkAnswer( - sql( - """ - |SELECT a, b, c, _unparsed - |FROM jsonTable - """.stripMargin), - Row(null, null, null, "{") :: - Row(null, null, null, """{"a":1, b:2}""") :: - Row(null, null, null, """{"a":{, b:3}""") :: - Row("str_a_4", "str_b_4", "str_c_4", null) :: - Row(null, null, null, "]") :: Nil - ) - - checkAnswer( - sql( - """ - |SELECT a, b, c - |FROM jsonTable - |WHERE _unparsed IS NULL - """.stripMargin), - Row("str_a_4", "str_b_4", "str_c_4") - ) - - checkAnswer( - sql( - """ - |SELECT _unparsed - |FROM jsonTable - |WHERE _unparsed IS NOT NULL - """.stripMargin), - Row("{") :: - Row("""{"a":1, b:2}""") :: - Row("""{"a":{, b:3}""") :: - Row("]") :: Nil - ) - } + assert(schema === jsonDF.schema) + + // In HiveContext, backticks should be used to access columns starting with a underscore. + checkAnswer( + jsonDF.select($"a", $"b", $"c", $"_unparsed"), + Row(null, null, null, "{") :: + Row(null, null, null, """{"a":1, b:2}""") :: + Row(null, null, null, """{"a":{, b:3}""") :: + Row("str_a_4", "str_b_4", "str_c_4", null) :: + Row(null, null, null, "]") :: Nil + ) + + checkAnswer( + jsonDF.filter($"_unparsed".isNull).select($"a", $"b", $"c"), + Row("str_a_4", "str_b_4", "str_c_4") + ) + + checkAnswer( + jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"), + Row("{") :: + Row("""{"a":1, b:2}""") :: + Row("""{"a":{, b:3}""") :: + Row("]") :: Nil + ) } } @@ -1952,19 +1928,20 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { StructField("c", StringType, true) :: Nil) val errMsg = intercept[AnalysisException] { spark.read - .option("mode", "PERMISSIVE") + .option("mode", "Permissive") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .schema(schema) .json(corruptRecords) }.getMessage assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + // We use `PERMISSIVE` mode by default if invalid string is given. withTempPath { dir => val path = dir.getCanonicalPath corruptRecords.toDF("value").write.text(path) val errMsg = intercept[AnalysisException] { spark.read - .option("mode", "PERMISSIVE") + .option("mode", "permm") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .schema(schema) .json(path)