Skip to content

Commit

Permalink
Improve JSON and CSV parsing of integer values (#4790)
Browse files Browse the repository at this point in the history
* Improve JSON and CSV support for boolean values

Signed-off-by: Andy Grove <[email protected]>

* implement custom boolean parsing logic for CSV and JSON

* Improve JSON and CSV parsing of integer values

Signed-off-by: Andy Grove <[email protected]>

* Add more tests. Remove csv configs that are no longer needed

Signed-off-by: Andy Grove <[email protected]>

* fix regression

* remove duplicate code

* add XFAIL test for JSON strings containing integers

* Save

* Save
  • Loading branch information
andygrove authored Feb 17, 2022
1 parent c18b4e5 commit 1db5070
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 142 deletions.
7 changes: 0 additions & 7 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csv.read.bool.enabled"></a>spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false
<a name="sql.csv.read.byte.enabled"></a>spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.date.enabled"></a>spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
<a name="sql.csv.read.double.enabled"></a>spark.rapids.sql.csv.read.double.enabled|Parsing CSV double has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false
<a name="sql.csv.read.float.enabled"></a>spark.rapids.sql.csv.read.float.enabled|Parsing CSV floats has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false
<a name="sql.csv.read.integer.enabled"></a>spark.rapids.sql.csv.read.integer.enabled|Parsing CSV integers is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.long.enabled"></a>spark.rapids.sql.csv.read.long.enabled|Parsing CSV longs is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.short.enabled"></a>spark.rapids.sql.csv.read.short.enabled|Parsing CSV shorts is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
Expand Down
20 changes: 12 additions & 8 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,25 @@ def read_impl(spark):
pytest.param('trucks-missing-quotes.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')),
pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true', 'nullValue': 'null'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2068')),
pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}),
pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}),
pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}),
pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}),
('simple_int_values.csv', _float_schema, {'header': 'true'}),
('simple_int_values.csv', _double_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}),
pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}),
pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _byte_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _short_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _int_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}),
Expand Down
16 changes: 15 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
_bool_schema = StructType([
StructField('number', BooleanType())])

_byte_schema = StructType([
StructField('number', ByteType())])

_short_schema = StructType([
StructField('number', ShortType())])

_int_schema = StructType([
StructField('number', IntegerType())])

_long_schema = StructType([
StructField('number', LongType())])

_float_schema = StructType([
StructField('number', FloatType())])

Expand Down Expand Up @@ -175,14 +187,16 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
@pytest.mark.parametrize('filename', [
'boolean.json',
pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')),
'ints.json',
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')),
'nan_and_inf.json',
pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')),
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
])
@pytest.mark.parametrize('schema', [_bool_schema, _float_schema, _double_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
Expand Down
9 changes: 9 additions & 0 deletions integration_tests/src/test/resources/ints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{ "number": 0 }
{ "number": -128 }
{ "number": 127 }
{ "number": -32768 }
{ "number": 32767 }
{ "number": -2147483648 }
{ "number": 2147483647 }
{ "number": -9223372036854775808 }
{ "number": 9223372036854775807 }
11 changes: 11 additions & 0 deletions integration_tests/src/test/resources/ints_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{ "number": true }
{ "number": 3.141 }
{ "number": "0" }
{ "number": "-128" }
{ "number": "127" }
{ "number": "-32768" }
{ "number": "32767" }
{ "number": "-2147483648" }
{ "number": "2147483647" }
{ "number": "-9223372036854775808" }
{ "number": "9223372036854775807" }
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ class MortgageSparkSuite extends FunSuite {
.config("spark.rapids.sql.incompatibleOps.enabled", true)
.config("spark.rapids.sql.hasNans", false)
.config("spark.rapids.sql.csv.read.date.enabled", true)
.config("spark.rapids.sql.csv.read.byte.enabled", true)
.config("spark.rapids.sql.csv.read.short.enabled", true)
.config("spark.rapids.sql.csv.read.integer.enabled", true)
.config("spark.rapids.sql.csv.read.long.enabled", true)
.config("spark.rapids.sql.csv.read.float.enabled", true)
.config("spark.rapids.sql.csv.read.double.enabled", true)
val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass
val prop = System.getProperty("rapids.shuffle.manager.override", "false")
if (prop.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,41 +222,6 @@ object GpuCSVScan {
}
}

if (!meta.conf.isCsvBoolReadEnabled && readSchema.map(_.dataType).contains(BooleanType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading boolean. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BOOLS} to true.")
}

if (!meta.conf.isCsvByteReadEnabled && readSchema.map(_.dataType).contains(ByteType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading bytes. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BYTES} to true.")
}

if (!meta.conf.isCsvShortReadEnabled && readSchema.map(_.dataType).contains(ShortType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading shorts. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_SHORTS} to true.")
}

if (!meta.conf.isCsvIntReadEnabled && readSchema.map(_.dataType).contains(IntegerType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading integers. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_INTEGERS} to true.")
}

if (!meta.conf.isCsvLongReadEnabled && readSchema.map(_.dataType).contains(LongType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading longs. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_LONGS} to true.")
}

if (!meta.conf.isCsvFloatReadEnabled && readSchema.map(_.dataType).contains(FloatType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading floats. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_FLOATS} to true.")
}

if (!meta.conf.isCsvDoubleReadEnabled && readSchema.map(_.dataType).contains(DoubleType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading doubles. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DOUBLES} to true.")
}

if (readSchema.map(_.dataType).contains(TimestampType)) {
if (!meta.conf.isCsvTimestampReadEnabled) {
meta.willNotWorkOnGpu("GpuCSVScan does not support parsing timestamp types. To " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import scala.collection.mutable.ListBuffer
import scala.math.max

import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand Down Expand Up @@ -166,11 +166,13 @@ abstract class GpuTextBasedPartitionReader(
readDataSchema
}

// read boolean and floating-point columns as strings in cuDF
// read boolean and numeric columns as strings in cuDF
val dataSchemaWithStrings = StructType(dataSchema.fields
.map(f => {
f.dataType match {
case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.DoubleType =>
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
DataTypes.DoubleType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
Expand All @@ -188,7 +190,7 @@ abstract class GpuTextBasedPartitionReader(
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)

// parse boolean and floating-point columns that were read as strings
// parse boolean and numeric columns that were read as strings
val castTable = withResource(table) { _ =>
val columns = new ListBuffer[ColumnVector]()
// Table increases the ref counts on the columns so we have
Expand All @@ -197,9 +199,17 @@ abstract class GpuTextBasedPartitionReader(
// ansi mode does not apply to text inputs
val ansiEnabled = false
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = dataSchema.fields(i).dataType match {
val castColumn = newReadDataSchema.fields(i).dataType match {
case DataTypes.BooleanType =>
castStringToBool(table.getColumn(i))
case DataTypes.ByteType =>
castStringToInt(table.getColumn(i), DType.INT8)
case DataTypes.ShortType =>
castStringToInt(table.getColumn(i), DType.INT16)
case DataTypes.IntegerType =>
castStringToInt(table.getColumn(i), DType.INT32)
case DataTypes.LongType =>
castStringToInt(table.getColumn(i), DType.INT64)
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
case DataTypes.DoubleType =>
Expand All @@ -222,6 +232,16 @@ abstract class GpuTextBasedPartitionReader(

def castStringToBool(input: ColumnVector): ColumnVector

def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = {
withResource(input.isInteger(intType)) { isInt =>
withResource(input.castTo(intType)) { asInt =>
withResource(Scalar.fromNull(intType)) { nullValue =>
isInt.ifElse(asInt, nullValue)
}
}
}
}

/**
* Read the host buffer to GPU table
* @param dataBuffer host buffer to be read
Expand Down
55 changes: 0 additions & 55 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -890,47 +890,6 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_BOOLS = conf("spark.rapids.sql.csv.read.bool.enabled")
.doc("Parsing an invalid CSV boolean value produces true instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_BYTES = conf("spark.rapids.sql.csv.read.byte.enabled")
.doc("Parsing CSV bytes is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_SHORTS = conf("spark.rapids.sql.csv.read.short.enabled")
.doc("Parsing CSV shorts is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_INTEGERS = conf("spark.rapids.sql.csv.read.integer.enabled")
.doc("Parsing CSV integers is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_LONGS = conf("spark.rapids.sql.csv.read.long.enabled")
.doc("Parsing CSV longs is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_FLOATS = conf("spark.rapids.sql.csv.read.float.enabled")
.doc("Parsing CSV floats has some issues at the min and max values for floating" +
"point numbers and can be more lenient on parsing inf and -inf values")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_DOUBLES = conf("spark.rapids.sql.csv.read.double.enabled")
.doc("Parsing CSV double has some issues at the min and max values for floating" +
"point numbers and can be more lenient on parsing inf and -inf values")
.booleanConf
.createWithDefault(false)

val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled")
.doc("When set to true enables all json input and output acceleration. " +
"(only input is currently supported anyways)")
Expand Down Expand Up @@ -1621,20 +1580,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCsvDateReadEnabled: Boolean = get(ENABLE_READ_CSV_DATES)

lazy val isCsvBoolReadEnabled: Boolean = get(ENABLE_READ_CSV_BOOLS)

lazy val isCsvByteReadEnabled: Boolean = get(ENABLE_READ_CSV_BYTES)

lazy val isCsvShortReadEnabled: Boolean = get(ENABLE_READ_CSV_SHORTS)

lazy val isCsvIntReadEnabled: Boolean = get(ENABLE_READ_CSV_INTEGERS)

lazy val isCsvLongReadEnabled: Boolean = get(ENABLE_READ_CSV_LONGS)

lazy val isCsvFloatReadEnabled: Boolean = get(ENABLE_READ_CSV_FLOATS)

lazy val isCsvDoubleReadEnabled: Boolean = get(ENABLE_READ_CSV_DOUBLES)

lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING)

lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2115,13 +2115,6 @@ object SupportedOpsForTools {
val readOps = types.map { t =>
val typeEnabled = if (format.toString.toLowerCase.equals("csv")) {
t.toString match {
case "BOOLEAN" => conf.isCsvBoolReadEnabled
case "BYTE" => conf.isCsvByteReadEnabled
case "SHORT" => conf.isCsvShortReadEnabled
case "INT" => conf.isCsvIntReadEnabled
case "LONG" => conf.isCsvLongReadEnabled
case "FLOAT" => conf.isCsvFloatReadEnabled
case "DOUBLE" => conf.isCsvDoubleReadEnabled
case "TIMESTAMP" => conf.isCsvTimestampReadEnabled
case "DATE" => conf.isCsvDateReadEnabled
case _ => true
Expand Down
Loading

0 comments on commit 1db5070

Please sign in to comment.