Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve JSON and CSV parsing of integer values #4790

Merged
merged 11 commits into from
Feb 17, 2022
7 changes: 0 additions & 7 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csv.read.bool.enabled"></a>spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false
<a name="sql.csv.read.byte.enabled"></a>spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.date.enabled"></a>spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
<a name="sql.csv.read.double.enabled"></a>spark.rapids.sql.csv.read.double.enabled|Parsing CSV double has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false
<a name="sql.csv.read.float.enabled"></a>spark.rapids.sql.csv.read.float.enabled|Parsing CSV floats has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false
<a name="sql.csv.read.integer.enabled"></a>spark.rapids.sql.csv.read.integer.enabled|Parsing CSV integers is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.long.enabled"></a>spark.rapids.sql.csv.read.long.enabled|Parsing CSV longs is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.short.enabled"></a>spark.rapids.sql.csv.read.short.enabled|Parsing CSV shorts is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
Expand Down
20 changes: 12 additions & 8 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,25 @@ def read_impl(spark):
pytest.param('trucks-missing-quotes.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')),
pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true', 'nullValue': 'null'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2068')),
pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}),
pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}),
pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}),
pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}),
('simple_int_values.csv', _float_schema, {'header': 'true'}),
('simple_int_values.csv', _double_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}),
pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}),
pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _byte_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _short_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _int_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}),
Expand Down
16 changes: 15 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
_bool_schema = StructType([
StructField('number', BooleanType())])

_byte_schema = StructType([
StructField('number', ByteType())])

_short_schema = StructType([
StructField('number', ShortType())])

_int_schema = StructType([
StructField('number', IntegerType())])

_long_schema = StructType([
StructField('number', LongType())])

_float_schema = StructType([
StructField('number', FloatType())])

Expand Down Expand Up @@ -175,14 +187,16 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
@pytest.mark.parametrize('filename', [
'boolean.json',
pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')),
'ints.json',
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')),
'nan_and_inf.json',
pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')),
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
])
@pytest.mark.parametrize('schema', [_bool_schema, _float_schema, _double_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
Expand Down
9 changes: 9 additions & 0 deletions integration_tests/src/test/resources/ints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{ "number": 0 }
{ "number": -128 }
{ "number": 127 }
{ "number": -32768 }
{ "number": 32767 }
{ "number": -2147483648 }
{ "number": 2147483647 }
{ "number": -9223372036854775808 }
{ "number": 9223372036854775807 }
11 changes: 11 additions & 0 deletions integration_tests/src/test/resources/ints_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{ "number": true }
{ "number": 3.141 }
{ "number": "0" }
{ "number": "-128" }
{ "number": "127" }
{ "number": "-32768" }
{ "number": "32767" }
{ "number": "-2147483648" }
{ "number": "2147483647" }
{ "number": "-9223372036854775808" }
{ "number": "9223372036854775807" }
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ class MortgageSparkSuite extends FunSuite {
.config("spark.rapids.sql.incompatibleOps.enabled", true)
.config("spark.rapids.sql.hasNans", false)
.config("spark.rapids.sql.csv.read.date.enabled", true)
.config("spark.rapids.sql.csv.read.byte.enabled", true)
.config("spark.rapids.sql.csv.read.short.enabled", true)
.config("spark.rapids.sql.csv.read.integer.enabled", true)
.config("spark.rapids.sql.csv.read.long.enabled", true)
.config("spark.rapids.sql.csv.read.float.enabled", true)
.config("spark.rapids.sql.csv.read.double.enabled", true)
val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass
val prop = System.getProperty("rapids.shuffle.manager.override", "false")
if (prop.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,41 +222,6 @@ object GpuCSVScan {
}
}

if (!meta.conf.isCsvBoolReadEnabled && readSchema.map(_.dataType).contains(BooleanType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading boolean. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BOOLS} to true.")
}

if (!meta.conf.isCsvByteReadEnabled && readSchema.map(_.dataType).contains(ByteType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading bytes. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BYTES} to true.")
}

if (!meta.conf.isCsvShortReadEnabled && readSchema.map(_.dataType).contains(ShortType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading shorts. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_SHORTS} to true.")
}

if (!meta.conf.isCsvIntReadEnabled && readSchema.map(_.dataType).contains(IntegerType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading integers. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_INTEGERS} to true.")
}

if (!meta.conf.isCsvLongReadEnabled && readSchema.map(_.dataType).contains(LongType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading longs. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_LONGS} to true.")
}

if (!meta.conf.isCsvFloatReadEnabled && readSchema.map(_.dataType).contains(FloatType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading floats. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_FLOATS} to true.")
}

if (!meta.conf.isCsvDoubleReadEnabled && readSchema.map(_.dataType).contains(DoubleType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading doubles. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DOUBLES} to true.")
}

if (readSchema.map(_.dataType).contains(TimestampType)) {
if (!meta.conf.isCsvTimestampReadEnabled) {
meta.willNotWorkOnGpu("GpuCSVScan does not support parsing timestamp types. To " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import scala.collection.mutable.ListBuffer
import scala.math.max

import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand Down Expand Up @@ -166,11 +166,13 @@ abstract class GpuTextBasedPartitionReader(
readDataSchema
}

// read boolean and floating-point columns as strings in cuDF
// read boolean and numeric columns as strings in cuDF
val dataSchemaWithStrings = StructType(dataSchema.fields
.map(f => {
f.dataType match {
case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.DoubleType =>
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
DataTypes.DoubleType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
Expand All @@ -188,7 +190,7 @@ abstract class GpuTextBasedPartitionReader(
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)

// parse boolean and floating-point columns that were read as strings
// parse boolean and numeric columns that were read as strings
val castTable = withResource(table) { _ =>
val columns = new ListBuffer[ColumnVector]()
// Table increases the ref counts on the columns so we have
Expand All @@ -197,9 +199,17 @@ abstract class GpuTextBasedPartitionReader(
// ansi mode does not apply to text inputs
val ansiEnabled = false
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = dataSchema.fields(i).dataType match {
val castColumn = newReadDataSchema.fields(i).dataType match {
case DataTypes.BooleanType =>
castStringToBool(table.getColumn(i))
case DataTypes.ByteType =>
castStringToInt(table.getColumn(i), DType.INT8)
case DataTypes.ShortType =>
castStringToInt(table.getColumn(i), DType.INT16)
case DataTypes.IntegerType =>
castStringToInt(table.getColumn(i), DType.INT32)
case DataTypes.LongType =>
castStringToInt(table.getColumn(i), DType.INT64)
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
case DataTypes.DoubleType =>
Expand All @@ -222,6 +232,16 @@ abstract class GpuTextBasedPartitionReader(

def castStringToBool(input: ColumnVector): ColumnVector

def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = {
withResource(input.isInteger(intType)) { isInt =>
withResource(input.castTo(intType)) { asInt =>
withResource(Scalar.fromNull(intType)) { nullValue =>
isInt.ifElse(asInt, nullValue)
}
}
}
}

/**
* Read the host buffer to GPU table
* @param dataBuffer host buffer to be read
Expand Down
55 changes: 0 additions & 55 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -890,47 +890,6 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_BOOLS = conf("spark.rapids.sql.csv.read.bool.enabled")
.doc("Parsing an invalid CSV boolean value produces true instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_BYTES = conf("spark.rapids.sql.csv.read.byte.enabled")
.doc("Parsing CSV bytes is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_SHORTS = conf("spark.rapids.sql.csv.read.short.enabled")
.doc("Parsing CSV shorts is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_INTEGERS = conf("spark.rapids.sql.csv.read.integer.enabled")
.doc("Parsing CSV integers is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_LONGS = conf("spark.rapids.sql.csv.read.long.enabled")
.doc("Parsing CSV longs is much more lenient and will return 0 for some " +
"malformed values instead of null")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_FLOATS = conf("spark.rapids.sql.csv.read.float.enabled")
.doc("Parsing CSV floats has some issues at the min and max values for floating" +
"point numbers and can be more lenient on parsing inf and -inf values")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_DOUBLES = conf("spark.rapids.sql.csv.read.double.enabled")
.doc("Parsing CSV double has some issues at the min and max values for floating" +
"point numbers and can be more lenient on parsing inf and -inf values")
.booleanConf
.createWithDefault(false)

val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled")
.doc("When set to true enables all json input and output acceleration. " +
"(only input is currently supported anyways)")
Expand Down Expand Up @@ -1621,20 +1580,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCsvDateReadEnabled: Boolean = get(ENABLE_READ_CSV_DATES)

lazy val isCsvBoolReadEnabled: Boolean = get(ENABLE_READ_CSV_BOOLS)

lazy val isCsvByteReadEnabled: Boolean = get(ENABLE_READ_CSV_BYTES)

lazy val isCsvShortReadEnabled: Boolean = get(ENABLE_READ_CSV_SHORTS)

lazy val isCsvIntReadEnabled: Boolean = get(ENABLE_READ_CSV_INTEGERS)

lazy val isCsvLongReadEnabled: Boolean = get(ENABLE_READ_CSV_LONGS)

lazy val isCsvFloatReadEnabled: Boolean = get(ENABLE_READ_CSV_FLOATS)

lazy val isCsvDoubleReadEnabled: Boolean = get(ENABLE_READ_CSV_DOUBLES)

lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING)

lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2114,13 +2114,6 @@ object SupportedOpsForTools {
val readOps = types.map { t =>
val typeEnabled = if (format.toString.toLowerCase.equals("csv")) {
t.toString match {
case "BOOLEAN" => conf.isCsvBoolReadEnabled
case "BYTE" => conf.isCsvByteReadEnabled
case "SHORT" => conf.isCsvShortReadEnabled
case "INT" => conf.isCsvIntReadEnabled
case "LONG" => conf.isCsvLongReadEnabled
case "FLOAT" => conf.isCsvFloatReadEnabled
case "DOUBLE" => conf.isCsvDoubleReadEnabled
case "TIMESTAMP" => conf.isCsvTimestampReadEnabled
case "DATE" => conf.isCsvDateReadEnabled
case _ => true
Expand Down
Loading