Skip to content

Commit

Permalink
[SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling opti…
Browse files Browse the repository at this point in the history
…on configurable when read CSV

### What changes were proposed in this pull request?
There are some differences between Spark CSV, opencsv and commons-csv, the typical case are described in SPARK-33566, When there are both unescaped quotes and unescaped qualifier in value,  the results of parsing are different.

The reason for the difference is Spark use `STOP_AT_DELIMITER` as default `UnescapedQuoteHandling` to build `CsvParser` and it not configurable.

On the other hand, opencsv and commons-csv use the parsing mechanism similar to `STOP_AT_CLOSING_QUOTE ` by default.

So this pr make `unescapedQuoteHandling` option configurable to get the same parsing result as opencsv and commons-csv.

### Why are the changes needed?
Make unescapedQuoteHandling option configurable when read CSV to make parsing more flexible。

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action

- Add a new case similar to that described in SPARK-33566

Closes #30518 from LuciferYang/SPARK-33566.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
LuciferYang authored and HyukjinKwon committed Nov 27, 2020
1 parent d082ad0 commit 433ae90
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 5 deletions.
26 changes: 24 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,
unescapedQuoteHandling=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -685,6 +686,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
unescapedQuoteHandling : str, optional
defines how the CsvParser will handle values with unescaped quotes. If None is
set, it uses the default value, ``STOP_AT_DELIMITER``.
* ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
the quote character and proceed parsing the value as a quoted value, until a closing
quote is found.
* ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters of the current
parsed value until the delimiter is found. If no delimiter is found in the value, the
parser will continue accumulating characters from the input until a delimiter or line
ending is found.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters until the
delimiter or a line ending is found in the input.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
for the given value will be skipped and the value set in nullValue will be produced
instead.
* ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
will be thrown.
Examples
--------
Expand All @@ -708,7 +729,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
unescapedQuoteHandling=unescapedQuoteHandling)
if isinstance(path, str):
path = [path]
if type(path) == list:
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/readwriter.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils):
lineSep: Optional[str] = ...,
pathGlobFilter: Optional[Union[bool, str]] = ...,
recursiveFileLookup: Optional[Union[bool, str]] = ...,
unescapedQuoteHandling: Optional[str] = ...,
) -> DataFrame: ...
def orc(
self,
Expand Down
25 changes: 23 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None):
pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -900,6 +900,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
recursiveFileLookup : str or bool, optional
recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
unescapedQuoteHandling : str, optional
defines how the CsvParser will handle values with unescaped quotes. If None is
set, it uses the default value, ``STOP_AT_DELIMITER``.
* ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
the quote character and proceed parsing the value as a quoted value, until a closing
quote is found.
* ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters of the current
parsed value until the delimiter is found. If no delimiter is found in the value, the
parser will continue accumulating characters from the input until a delimiter or line
ending is found.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters until the
delimiter or a line ending is found in the input.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
for the given value will be skipped and the value set in nullValue will be produced
instead.
* ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
will be thrown.
.. versionadded:: 2.0.0
Expand All @@ -926,7 +946,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
unescapedQuoteHandling=unescapedQuoteHandling)
if isinstance(path, str):
return self._df(self._jreader.csv(path))
else:
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/streaming.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils):
lineSep: Optional[str] = ...,
pathGlobFilter: Optional[Union[bool, str]] = ...,
recursiveFileLookup: Optional[Union[bool, str]] = ...,
unescapedQuoteHandling: Optional[str] = ...,
) -> DataFrame: ...

class DataStreamWriter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ class CSVOptions(
}
val lineSeparatorInWrite: Option[String] = lineSeparator

/**
* The handling method to be used when unescaped quotes are found in the input.
*/
val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
.getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down Expand Up @@ -258,7 +264,7 @@ class CSVOptions(
settings.setNullValue(nullValue)
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
* will handle values with unescaped quotes.
* <ul>
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
* the quote character and proceed parsing the value as a quoted value, until a closing
* quote is found.</li>
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters of the current
* parsed value until the delimiter is found. If no
* delimiter is found in the value, the parser will continue accumulating characters from
* the input until a delimiter or line ending is found.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters until the
* delimiter or a line ending is found in the input.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
* for the given value will be skipped and the value set in nullValue will be produced
* instead.</li>
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
* will be thrown.</li>
* </ul>
* </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes. Note that Spark tries
* to parse only required columns in CSV under column pruning. Therefore, corrupt records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
* will handle values with unescaped quotes.
* <ul>
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
* the quote character and proceed parsing the value as a quoted value, until a closing
* quote is found.</li>
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters of the current
* parsed value until the delimiter is found. If no delimiter is found in the value, the
* parser will continue accumulating characters from the input until a delimiter or line
* ending is found.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters until the
* delimiter or a line ending is found in the input.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
* for the given value will be skipped and the value set in nullValue will be produced
* instead.</li>
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
* will be thrown.</li>
* </ul>
* </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2428,6 +2428,30 @@ abstract class CSVSuite
assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
}
}

test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
"unescaped quotes and unescaped delimiter data correctly") {
withTempPath { path =>
val dataPath = path.getCanonicalPath
val row1 = Row("""a,""b,c""", "xyz")
val row2 = Row("""a,b,c""", """x""yz""")
// Generate the test data, use `,` as delimiter and `"` as quotes, but they didn't escape.
Seq(
"""c1,c2""",
s""""${row1.getString(0)}","${row1.getString(1)}"""",
s""""${row2.getString(0)}","${row2.getString(1)}"""")
.toDF().repartition(1).write.text(dataPath)
// Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
// the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
val result = spark.read
.option("inferSchema", "true")
.option("header", "true")
.option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
.csv(dataPath).collect()
val exceptResults = Array(row1, row2)
assert(result.sameElements(exceptResults))
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down

0 comments on commit 433ae90

Please sign in to comment.