From c735beef6bde256933d414d5c8b89254266bf15a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 14 Feb 2022 11:56:02 -0700 Subject: [PATCH 1/9] Improve JSON and CSV support for boolean values Signed-off-by: Andy Grove --- integration_tests/src/main/python/csv_test.py | 2 +- integration_tests/src/main/python/json_test.py | 7 ++++++- integration_tests/src/test/resources/boolean.json | 3 +++ .../src/test/resources/boolean_invalid.json | 9 +++++++++ .../src/main/scala/com/nvidia/spark/rapids/GpuCast.scala | 2 +- .../spark/rapids/GpuTextBasedPartitionReader.scala | 8 +++++--- 6 files changed, 25 insertions(+), 6 deletions(-) create mode 100644 integration_tests/src/test/resources/boolean.json create mode 100644 integration_tests/src/test/resources/boolean_invalid.json diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 9101ef56760..49c16191333 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -231,7 +231,7 @@ def read_impl(spark): pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), - pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')), + pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}), pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) ], ids=idfn) diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 563d713cc04..a349e3b3e67 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -38,6 +38,9 @@ 'spark.rapids.sql.format.json.enabled': 'true', 'spark.rapids.sql.format.json.read.enabled': 'true'} +_bool_schema = StructType([ + StructField('number', BooleanType())]) + _float_schema = StructType([ StructField('number', FloatType())]) @@ -170,6 +173,8 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena @approximate_float @pytest.mark.parametrize('filename', [ + 'boolean.json', + pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), 'nan_and_inf.json', pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), 'floats.json', @@ -177,7 +182,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'floats_invalid.json', pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), ]) -@pytest.mark.parametrize('schema', [_float_schema, _double_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _float_schema, _double_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) diff --git a/integration_tests/src/test/resources/boolean.json b/integration_tests/src/test/resources/boolean.json new file mode 100644 index 00000000000..dd7ffda8970 --- /dev/null +++ b/integration_tests/src/test/resources/boolean.json @@ -0,0 +1,3 @@ +{ "number": true } +{ "number": false } +{ "number": null } \ No newline at end of file diff --git a/integration_tests/src/test/resources/boolean_invalid.json b/integration_tests/src/test/resources/boolean_invalid.json new file mode 100644 index 00000000000..f3e017baf91 --- /dev/null +++ b/integration_tests/src/test/resources/boolean_invalid.json @@ -0,0 +1,9 @@ +{ "number": "true" } +{ "number": "false" } +{ "number": "null" } +{ "number": "" } +{ "number": "True" } +{ "number": "TRUE" } +{ "number": "False" } +{ "number": "FALSE" } +{ "number": "BAD" } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 47f4015e23c..0c03f4b8d0c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -849,7 +849,7 @@ object GpuCast extends Arm { } } - private def castStringToBool(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { + def castStringToBool(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { val trueStrings = Seq("t", "true", "y", "yes", "1") val falseStrings = Seq("f", "false", "n", "no", "0") val boolStrings = trueStrings ++ falseStrings diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 515db04fc11..f7978631f32 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -166,11 +166,11 @@ abstract class GpuTextBasedPartitionReader( readDataSchema } - // read floating-point columns as strings in cuDF + // read boolean and floating-point columns as strings in cuDF val dataSchemaWithStrings = StructType(dataSchema.fields .map(f => { f.dataType match { - case DataTypes.FloatType | DataTypes.DoubleType => + case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.DoubleType => f.copy(dataType = DataTypes.StringType) case _ => f @@ -188,7 +188,7 @@ abstract class GpuTextBasedPartitionReader( } maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - // parse floating-point columns that were read as strings + // parse boolean and floating-point columns that were read as strings val castTable = withResource(table) { _ => val columns = new ListBuffer[ColumnVector]() // Table increases the ref counts on the columns so we have @@ -198,6 +198,8 @@ abstract class GpuTextBasedPartitionReader( val ansiEnabled = false for (i <- 0 until table.getNumberOfColumns) { val castColumn = dataSchema.fields(i).dataType match { + case DataTypes.BooleanType => + GpuCast.castStringToBool(table.getColumn(i), ansiEnabled) case DataTypes.FloatType => GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) case DataTypes.DoubleType => From 1231fc6e2e1190d2beafd6af00123e609b648adb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 14 Feb 2022 16:14:05 -0700 Subject: [PATCH 2/9] implement custom boolean parsing logic for CSV and JSON --- .../src/test/resources/boolean.json | 10 ++++++- .../test/resources/simple_boolean_values.csv | 8 ++++++ .../spark/rapids/GpuBatchScanExec.scala | 26 ++++++++++++++++++- .../com/nvidia/spark/rapids/GpuCast.scala | 2 +- .../rapids/GpuTextBasedPartitionReader.scala | 4 ++- .../catalyst/json/rapids/GpuJsonScan.scala | 22 +++++++++++++++- 6 files changed, 67 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/test/resources/boolean.json b/integration_tests/src/test/resources/boolean.json index dd7ffda8970..fc49e251617 100644 --- a/integration_tests/src/test/resources/boolean.json +++ b/integration_tests/src/test/resources/boolean.json @@ -1,3 +1,11 @@ { "number": true } +{ "number": True } +{ "number": TRUE } { "number": false } -{ "number": null } \ No newline at end of file +{ "number": False } +{ "number": FALSE } +{ "number": null } +{ "number": y } +{ "number": n } +{ "number": 0 } +{ "number": 1 } \ No newline at end of file diff --git a/integration_tests/src/test/resources/simple_boolean_values.csv b/integration_tests/src/test/resources/simple_boolean_values.csv index 4b8ec5a4e16..d841b528c7d 100644 --- a/integration_tests/src/test/resources/simple_boolean_values.csv +++ b/integration_tests/src/test/resources/simple_boolean_values.csv @@ -7,3 +7,11 @@ False TRUE FALSE BAD +y +n +yes +no +1 +0 +t +f diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index c5b2be30471..fe1cc939653 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import ai.rapids.cudf -import ai.rapids.cudf.{HostMemoryBuffer, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -423,4 +423,28 @@ class CSVPartitionReader( * @return the file format short name */ override def getFileFormatShortName: String = "CSV" + + /** + * CSV supports "true" and "false" (case-insensitive) as valid boolean values. + */ + override def castStringToBool(input: ColumnVector): ColumnVector = { + withResource(input.strip()) { stripped => + withResource(stripped.lower()) { lower => + withResource(Scalar.fromString("true")) { t => + withResource(Scalar.fromString("false")) { f => + withResource(lower.equalTo(t)) { isTrue => + withResource(lower.equalTo(f)) { isFalse => + withResource(isTrue.or(isFalse)) { isValidBool => + withResource(Scalar.fromNull(DType.BOOL8)) { nullBool => + isValidBool.ifElse(isTrue, nullBool) + } + } + } + } + } + } + } + } + } + } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 0c03f4b8d0c..47f4015e23c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -849,7 +849,7 @@ object GpuCast extends Arm { } } - def castStringToBool(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { + private def castStringToBool(input: ColumnVector, ansiEnabled: Boolean): ColumnVector = { val trueStrings = Seq("t", "true", "y", "yes", "1") val falseStrings = Seq("f", "false", "n", "no", "0") val boolStrings = trueStrings ++ falseStrings diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index f7978631f32..4c522ee248d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -199,7 +199,7 @@ abstract class GpuTextBasedPartitionReader( for (i <- 0 until table.getNumberOfColumns) { val castColumn = dataSchema.fields(i).dataType match { case DataTypes.BooleanType => - GpuCast.castStringToBool(table.getColumn(i), ansiEnabled) + castStringToBool(table.getColumn(i)) case DataTypes.FloatType => GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) case DataTypes.DoubleType => @@ -220,6 +220,8 @@ abstract class GpuTextBasedPartitionReader( } } + def castStringToBool(input: ColumnVector): ColumnVector + /** * Read the host buffer to GPU table * @param dataBuffer host buffer to be read diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index ed7edeba396..10ed3368371 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import ai.rapids.cudf -import ai.rapids.cudf.{HostMemoryBuffer, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} import com.nvidia.spark.rapids._ import org.apache.hadoop.conf.Configuration @@ -334,4 +334,24 @@ class JsonPartitionReader( Some(new Table(prunedColumnVectors: _*)) } } + + /** + * JSON only supports unquoted lower-case "true" and "false" as valid boolean values. + */ + override def castStringToBool(input: ColumnVector): ColumnVector = { + withResource(Scalar.fromString("true")) { t => + withResource(Scalar.fromString("false")) { f => + withResource(input.equalTo(t)) { isTrue => + withResource(input.equalTo(f)) { isFalse => + withResource(isTrue.or(isFalse)) { isValidBool => + withResource(Scalar.fromNull(DType.BOOL8)) { nullBool => + isValidBool.ifElse(isTrue, nullBool) + } + } + } + } + } + } + } + } From 5bfdd3d7aa02b218ccf580ad5d6fff122e637bdc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Feb 2022 09:16:13 -0700 Subject: [PATCH 3/9] Improve JSON and CSV parsing of integer values Signed-off-by: Andy Grove --- integration_tests/src/main/python/csv_test.py | 16 +++++------ .../src/main/python/json_test.py | 15 +++++++++- .../src/test/resources/ints.json | 9 ++++++ .../rapids/GpuTextBasedPartitionReader.scala | 28 ++++++++++++++++--- 4 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 integration_tests/src/test/resources/ints.json diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 49c16191333..62af9215598 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -214,16 +214,16 @@ def read_impl(spark): pytest.param('trucks-missing-quotes.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')), pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true', 'nullValue': 'null'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2068')), pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}), + pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}), + pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}), + pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}), ('simple_int_values.csv', _float_schema, {'header': 'true'}), ('simple_int_values.csv', _double_schema, {'header': 'true'}), - pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}), pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index a349e3b3e67..823d8d0a2a2 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -41,6 +41,18 @@ _bool_schema = StructType([ StructField('number', BooleanType())]) +_byte_schema = StructType([ + StructField('number', ByteType())]) + +_short_schema = StructType([ + StructField('number', ShortType())]) + +_int_schema = StructType([ + StructField('number', IntegerType())]) + +_long_schema = StructType([ + StructField('number', LongType())]) + _float_schema = StructType([ StructField('number', FloatType())]) @@ -175,6 +187,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena @pytest.mark.parametrize('filename', [ 'boolean.json', pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), + 'ints.json', 'nan_and_inf.json', pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), 'floats.json', @@ -182,7 +195,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'floats_invalid.json', pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), ]) -@pytest.mark.parametrize('schema', [_bool_schema, _float_schema, _double_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) diff --git a/integration_tests/src/test/resources/ints.json b/integration_tests/src/test/resources/ints.json new file mode 100644 index 00000000000..60f1b898fb7 --- /dev/null +++ b/integration_tests/src/test/resources/ints.json @@ -0,0 +1,9 @@ +{ "number": 0 } +{ "number": -128 } +{ "number": 127 } +{ "number": -32768 } +{ "number": 32767 } +{ "number": -2147483648 } +{ "number": 2147483647 } +{ "number": -9223372036854775808 } +{ "number": 9223372036854775807 } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 4c522ee248d..558183e1b9b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import scala.math.max -import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory @@ -166,11 +166,13 @@ abstract class GpuTextBasedPartitionReader( readDataSchema } - // read boolean and floating-point columns as strings in cuDF + // read boolean and numeric columns as strings in cuDF val dataSchemaWithStrings = StructType(dataSchema.fields .map(f => { f.dataType match { - case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.DoubleType => + case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | + DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | + DataTypes.DoubleType => f.copy(dataType = DataTypes.StringType) case _ => f @@ -188,7 +190,7 @@ abstract class GpuTextBasedPartitionReader( } maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - // parse boolean and floating-point columns that were read as strings + // parse boolean and numeric columns that were read as strings val castTable = withResource(table) { _ => val columns = new ListBuffer[ColumnVector]() // Table increases the ref counts on the columns so we have @@ -200,6 +202,14 @@ abstract class GpuTextBasedPartitionReader( val castColumn = dataSchema.fields(i).dataType match { case DataTypes.BooleanType => castStringToBool(table.getColumn(i)) + case DataTypes.ByteType => + castStringToInt(table.getColumn(i), DType.INT8) + case DataTypes.ShortType => + castStringToInt(table.getColumn(i), DType.INT16) + case DataTypes.IntegerType => + castStringToInt(table.getColumn(i), DType.INT32) + case DataTypes.LongType => + castStringToInt(table.getColumn(i), DType.INT64) case DataTypes.FloatType => GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) case DataTypes.DoubleType => @@ -222,6 +232,16 @@ abstract class GpuTextBasedPartitionReader( def castStringToBool(input: ColumnVector): ColumnVector + def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { + withResource(input.isInteger(intType)) { isInt => + withResource(input.castTo(intType)) { asInt => + withResource(Scalar.fromNull(intType)) { nullValue => + isInt.ifElse(asInt, nullValue) + } + } + } + } + /** * Read the host buffer to GPU table * @param dataBuffer host buffer to be read From 98e2322269a84b9cba4ad9f39e452595e1316d82 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Feb 2022 11:13:16 -0700 Subject: [PATCH 4/9] Add more tests. Remove csv configs that are no longer needed Signed-off-by: Andy Grove --- docs/configs.md | 7 --- integration_tests/src/main/python/csv_test.py | 4 ++ .../tests/mortgage/MortgageSparkSuite.scala | 6 -- .../spark/rapids/GpuBatchScanExec.scala | 46 ++++------------ .../rapids/GpuTextBasedPartitionReader.scala | 16 ++---- .../com/nvidia/spark/rapids/RapidsConf.scala | 55 ------------------- .../com/nvidia/spark/rapids/TypeChecks.scala | 7 --- .../catalyst/json/rapids/GpuJsonScan.scala | 10 ++++ .../nvidia/spark/rapids/CsvScanSuite.scala | 14 ++--- .../rapids/SparkQueryCompareTestSuite.scala | 6 -- .../main/resources/supportedDataSource.csv | 2 +- 11 files changed, 35 insertions(+), 138 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 8469f482041..a6e826a98aa 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -67,14 +67,7 @@ Name | Description | Default Value spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1 -spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false -spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false -spark.rapids.sql.csv.read.double.enabled|Parsing CSV double has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false -spark.rapids.sql.csv.read.float.enabled|Parsing CSV floats has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false -spark.rapids.sql.csv.read.integer.enabled|Parsing CSV integers is much more lenient and will return 0 for some malformed values instead of null|false -spark.rapids.sql.csv.read.long.enabled|Parsing CSV longs is much more lenient and will return 0 for some malformed values instead of null|false -spark.rapids.sql.csv.read.short.enabled|Parsing CSV shorts is much more lenient and will return 0 for some malformed values instead of null|false spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 62af9215598..67c5e7344b4 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -229,6 +229,10 @@ def read_impl(spark): pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}), pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _byte_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _short_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _int_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}), diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala index 145905df727..f1265e970d9 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala @@ -48,12 +48,6 @@ class MortgageSparkSuite extends FunSuite { .config("spark.rapids.sql.incompatibleOps.enabled", true) .config("spark.rapids.sql.hasNans", false) .config("spark.rapids.sql.csv.read.date.enabled", true) - .config("spark.rapids.sql.csv.read.byte.enabled", true) - .config("spark.rapids.sql.csv.read.short.enabled", true) - .config("spark.rapids.sql.csv.read.integer.enabled", true) - .config("spark.rapids.sql.csv.read.long.enabled", true) - .config("spark.rapids.sql.csv.read.float.enabled", true) - .config("spark.rapids.sql.csv.read.double.enabled", true) val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass val prop = System.getProperty("rapids.shuffle.manager.override", "false") if (prop.equalsIgnoreCase("true")) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index fe1cc939653..fdcf9fb886b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -222,41 +222,6 @@ object GpuCSVScan { } } - if (!meta.conf.isCsvBoolReadEnabled && readSchema.map(_.dataType).contains(BooleanType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading boolean. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BOOLS} to true.") - } - - if (!meta.conf.isCsvByteReadEnabled && readSchema.map(_.dataType).contains(ByteType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading bytes. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BYTES} to true.") - } - - if (!meta.conf.isCsvShortReadEnabled && readSchema.map(_.dataType).contains(ShortType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading shorts. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_SHORTS} to true.") - } - - if (!meta.conf.isCsvIntReadEnabled && readSchema.map(_.dataType).contains(IntegerType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading integers. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_INTEGERS} to true.") - } - - if (!meta.conf.isCsvLongReadEnabled && readSchema.map(_.dataType).contains(LongType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading longs. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_LONGS} to true.") - } - - if (!meta.conf.isCsvFloatReadEnabled && readSchema.map(_.dataType).contains(FloatType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading floats. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_FLOATS} to true.") - } - - if (!meta.conf.isCsvDoubleReadEnabled && readSchema.map(_.dataType).contains(DoubleType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading doubles. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DOUBLES} to true.") - } - if (readSchema.map(_.dataType).contains(TimestampType)) { if (!meta.conf.isCsvTimestampReadEnabled) { meta.willNotWorkOnGpu("GpuCSVScan does not support parsing timestamp types. To " + @@ -447,4 +412,15 @@ class CSVPartitionReader( } } + override def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { + // TODO this is still a work-in-progress and does not cover some edge cases yet + withResource(input.isInteger(intType)) { isInt => + withResource(input.castTo(intType)) { asInt => + withResource(Scalar.fromNull(intType)) { nullValue => + isInt.ifElse(asInt, nullValue) + } + } + } + } + } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 558183e1b9b..a9f8be803e8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import scala.math.max -import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory @@ -167,7 +167,7 @@ abstract class GpuTextBasedPartitionReader( } // read boolean and numeric columns as strings in cuDF - val dataSchemaWithStrings = StructType(dataSchema.fields + val dataSchemaWithStrings = StructType(newReadDataSchema.fields .map(f => { f.dataType match { case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | @@ -199,7 +199,7 @@ abstract class GpuTextBasedPartitionReader( // ansi mode does not apply to text inputs val ansiEnabled = false for (i <- 0 until table.getNumberOfColumns) { - val castColumn = dataSchema.fields(i).dataType match { + val castColumn = newReadDataSchema.fields(i).dataType match { case DataTypes.BooleanType => castStringToBool(table.getColumn(i)) case DataTypes.ByteType => @@ -232,15 +232,7 @@ abstract class GpuTextBasedPartitionReader( def castStringToBool(input: ColumnVector): ColumnVector - def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { - withResource(input.isInteger(intType)) { isInt => - withResource(input.castTo(intType)) { asInt => - withResource(Scalar.fromNull(intType)) { nullValue => - isInt.ifElse(asInt, nullValue) - } - } - } - } + def castStringToInt(input: ColumnVector, intType: DType): ColumnVector /** * Read the host buffer to GPU table diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 2afacb899e6..d403dbf653d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -890,47 +890,6 @@ object RapidsConf { .booleanConf .createWithDefault(false) - val ENABLE_READ_CSV_BOOLS = conf("spark.rapids.sql.csv.read.bool.enabled") - .doc("Parsing an invalid CSV boolean value produces true instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_BYTES = conf("spark.rapids.sql.csv.read.byte.enabled") - .doc("Parsing CSV bytes is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_SHORTS = conf("spark.rapids.sql.csv.read.short.enabled") - .doc("Parsing CSV shorts is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_INTEGERS = conf("spark.rapids.sql.csv.read.integer.enabled") - .doc("Parsing CSV integers is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_LONGS = conf("spark.rapids.sql.csv.read.long.enabled") - .doc("Parsing CSV longs is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_FLOATS = conf("spark.rapids.sql.csv.read.float.enabled") - .doc("Parsing CSV floats has some issues at the min and max values for floating" + - "point numbers and can be more lenient on parsing inf and -inf values") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_DOUBLES = conf("spark.rapids.sql.csv.read.double.enabled") - .doc("Parsing CSV double has some issues at the min and max values for floating" + - "point numbers and can be more lenient on parsing inf and -inf values") - .booleanConf - .createWithDefault(false) - val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled") .doc("When set to true enables all json input and output acceleration. " + "(only input is currently supported anyways)") @@ -1621,20 +1580,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCsvDateReadEnabled: Boolean = get(ENABLE_READ_CSV_DATES) - lazy val isCsvBoolReadEnabled: Boolean = get(ENABLE_READ_CSV_BOOLS) - - lazy val isCsvByteReadEnabled: Boolean = get(ENABLE_READ_CSV_BYTES) - - lazy val isCsvShortReadEnabled: Boolean = get(ENABLE_READ_CSV_SHORTS) - - lazy val isCsvIntReadEnabled: Boolean = get(ENABLE_READ_CSV_INTEGERS) - - lazy val isCsvLongReadEnabled: Boolean = get(ENABLE_READ_CSV_LONGS) - - lazy val isCsvFloatReadEnabled: Boolean = get(ENABLE_READ_CSV_FLOATS) - - lazy val isCsvDoubleReadEnabled: Boolean = get(ENABLE_READ_CSV_DOUBLES) - lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING) lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 30885828852..873c7b4161d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -2114,13 +2114,6 @@ object SupportedOpsForTools { val readOps = types.map { t => val typeEnabled = if (format.toString.toLowerCase.equals("csv")) { t.toString match { - case "BOOLEAN" => conf.isCsvBoolReadEnabled - case "BYTE" => conf.isCsvByteReadEnabled - case "SHORT" => conf.isCsvShortReadEnabled - case "INT" => conf.isCsvIntReadEnabled - case "LONG" => conf.isCsvLongReadEnabled - case "FLOAT" => conf.isCsvFloatReadEnabled - case "DOUBLE" => conf.isCsvDoubleReadEnabled case "TIMESTAMP" => conf.isCsvTimestampReadEnabled case "DATE" => conf.isCsvDateReadEnabled case _ => true diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 10ed3368371..6a25e3a32ff 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -354,4 +354,14 @@ class JsonPartitionReader( } } + override def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { + withResource(input.isInteger(intType)) { isInt => + withResource(input.castTo(intType)) { asInt => + withResource(Scalar.fromNull(intType)) { nullValue => + isInt.ifElse(asInt, nullValue) + } + } + } + } + } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala index 7b9210c081f..e082e61354c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual("Test CSV splits with chunks", floatCsvDf, conf = new SparkConf() - .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") - .set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")) { + .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) { frame => frame.select(col("floats")) } @@ -40,8 +39,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { "Test CSV count chunked by rows", intsFromCsv, conf = new SparkConf() - .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { + .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) { frameCount } @@ -49,8 +47,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { "Test CSV count chunked by bytes", intsFromCsv, conf = new SparkConf() - .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "0") - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { + .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "0")) { frameCount } @@ -61,8 +58,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { intsFromCsvInferredSchema, Seq("FileSourceScanExec", "FilterExec", "CollectLimitExec", "GreaterThan", "Length", "StringTrim", "LocalTableScanExec", "DeserializeToObjectExec", "Invoke", "AttributeReference", "Literal"), - conf = new SparkConf() - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { + conf = new SparkConf()) { frame => frame.select(col("*")) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index b8357c9db15..ea0828a8d64 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -151,12 +151,6 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { def enableCsvConf(): SparkConf = { new SparkConf() .set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true") } // @see java.lang.Float#intBitsToFloat diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index 0ae3b52f898..4a42eede67c 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -1,5 +1,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT -CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA +CSV,read,S,S,S,S,S,S,S,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA JSON,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA From 4a7920ba9fd6214301e93faf4c4c9115227a926c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Feb 2022 14:43:35 -0700 Subject: [PATCH 5/9] fix regression --- .../com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index a9f8be803e8..2f012a72e0f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -167,7 +167,7 @@ abstract class GpuTextBasedPartitionReader( } // read boolean and numeric columns as strings in cuDF - val dataSchemaWithStrings = StructType(newReadDataSchema.fields + val dataSchemaWithStrings = StructType(dataSchema.fields .map(f => { f.dataType match { case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | From ded89c63f974cfc81cf3d5291198055ee4b4af45 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Feb 2022 15:00:51 -0700 Subject: [PATCH 6/9] remove duplicate code --- .../com/nvidia/spark/rapids/GpuBatchScanExec.scala | 12 ------------ .../spark/rapids/GpuTextBasedPartitionReader.scala | 12 ++++++++++-- .../spark/sql/catalyst/json/rapids/GpuJsonScan.scala | 11 ----------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index fdcf9fb886b..c99ef19f735 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -411,16 +411,4 @@ class CSVPartitionReader( } } } - - override def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { - // TODO this is still a work-in-progress and does not cover some edge cases yet - withResource(input.isInteger(intType)) { isInt => - withResource(input.castTo(intType)) { asInt => - withResource(Scalar.fromNull(intType)) { nullValue => - isInt.ifElse(asInt, nullValue) - } - } - } - } - } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 2f012a72e0f..7b079ee0618 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import scala.math.max -import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory @@ -232,7 +232,15 @@ abstract class GpuTextBasedPartitionReader( def castStringToBool(input: ColumnVector): ColumnVector - def castStringToInt(input: ColumnVector, intType: DType): ColumnVector + def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { + withResource(input.isInteger(intType)) { isInt => + withResource(input.castTo(intType)) { asInt => + withResource(Scalar.fromNull(intType)) { nullValue => + isInt.ifElse(asInt, nullValue) + } + } + } + } /** * Read the host buffer to GPU table diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 6a25e3a32ff..ffbb5024882 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -353,15 +353,4 @@ class JsonPartitionReader( } } } - - override def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { - withResource(input.isInteger(intType)) { isInt => - withResource(input.castTo(intType)) { asInt => - withResource(Scalar.fromNull(intType)) { nullValue => - isInt.ifElse(asInt, nullValue) - } - } - } - } - } From f611dcd9bc2c3c22621b5099472a0eaa880b2f9b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Feb 2022 15:44:15 -0700 Subject: [PATCH 7/9] add XFAIL test for JSON strings containing integers --- integration_tests/src/main/python/json_test.py | 1 + .../src/test/resources/ints_invalid.json | 11 +++++++++++ 2 files changed, 12 insertions(+) create mode 100644 integration_tests/src/test/resources/ints_invalid.json diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 823d8d0a2a2..3e50eb0d73a 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -188,6 +188,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'boolean.json', pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), 'ints.json', + pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')), 'nan_and_inf.json', pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), 'floats.json', diff --git a/integration_tests/src/test/resources/ints_invalid.json b/integration_tests/src/test/resources/ints_invalid.json new file mode 100644 index 00000000000..9ab9944d983 --- /dev/null +++ b/integration_tests/src/test/resources/ints_invalid.json @@ -0,0 +1,11 @@ +{ "number": true } +{ "number": 3.141 } +{ "number": "0" } +{ "number": "-128" } +{ "number": "127" } +{ "number": "-32768" } +{ "number": "32767" } +{ "number": "-2147483648" } +{ "number": "2147483647" } +{ "number": "-9223372036854775808" } +{ "number": "9223372036854775807" } \ No newline at end of file From 5760a93ee0c81a88e909552fd2db0cfbaba8a68a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 17 Feb 2022 09:34:23 -0700 Subject: [PATCH 8/9] Save --- .../rapids/GpuTextBasedPartitionReader.scala | 10 ++++++++-- .../catalyst/json/rapids/GpuJsonScan.scala | 20 +++++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 7b079ee0618..a7337a884ad 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -178,7 +178,12 @@ abstract class GpuTextBasedPartitionReader( f } })) - val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings) + val cudfDataSchema = GpuColumnVector.from(dataSchemaWithStrings) + +// println(s"dataSchema: ${dataSchema.fieldNames.mkString(", ")}") +// println(s"cudfSchema: ${cudfDataSchema.getColumnNames.mkString(", ")}") +// println(s"readDataSchema: ${readDataSchema.fieldNames.mkString(", ")}") +// println(s"newReadDataSchema: ${newReadDataSchema.fieldNames.mkString(", ")}") // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME)) @@ -186,8 +191,9 @@ abstract class GpuTextBasedPartitionReader( // The buffer that is sent down val table = withResource(new NvtxWithMetrics(getFileFormatShortName + " decode", NvtxColor.DARK_GREEN, metrics(GPU_DECODE_TIME))) { _ => - readToTable(dataBuffer, dataSize, cudfSchema, newReadDataSchema, isFirstChunk) + readToTable(dataBuffer, dataSize, cudfDataSchema, newReadDataSchema, isFirstChunk) } +// println(s"table has ${table.getNumberOfColumns} columns") maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) // parse boolean and numeric columns that were read as strings diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 10ed3368371..795a7d8eeb3 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.json.rapids import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} @@ -40,7 +41,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration; +import org.apache.spark.util.SerializableConfiguration object GpuJsonScan { @@ -287,7 +288,22 @@ class JsonPartitionReader( hasHeader: Boolean): Table = { val jsonOpts = buildJsonOptions(parsedOptions) - Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize) + // cuDF does not yet support reading a subset of columns so we have + // to apply the read schema projection here + withResource(Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize)) { tbl => + val columns = new ListBuffer[ColumnVector]() + closeOnExcept(columns) { _ => + for (name <- readDataSchema.fieldNames) { + val i = cudfSchema.getColumnNames.indexOf(name) + if (i == -1) { + throw new IllegalStateException( + s"read schema contains field named '$name' that is not in the data schema") + } + columns += tbl.getColumn(i) + } + } + new Table(columns: _*) + } } /** From fd29d16b183d9ee1ac32f37ead8a98b1f60e87b0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 17 Feb 2022 09:35:15 -0700 Subject: [PATCH 9/9] Save --- .../spark/rapids/GpuTextBasedPartitionReader.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index a7337a884ad..7b079ee0618 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -178,12 +178,7 @@ abstract class GpuTextBasedPartitionReader( f } })) - val cudfDataSchema = GpuColumnVector.from(dataSchemaWithStrings) - -// println(s"dataSchema: ${dataSchema.fieldNames.mkString(", ")}") -// println(s"cudfSchema: ${cudfDataSchema.getColumnNames.mkString(", ")}") -// println(s"readDataSchema: ${readDataSchema.fieldNames.mkString(", ")}") -// println(s"newReadDataSchema: ${newReadDataSchema.fieldNames.mkString(", ")}") + val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings) // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME)) @@ -191,9 +186,8 @@ abstract class GpuTextBasedPartitionReader( // The buffer that is sent down val table = withResource(new NvtxWithMetrics(getFileFormatShortName + " decode", NvtxColor.DARK_GREEN, metrics(GPU_DECODE_TIME))) { _ => - readToTable(dataBuffer, dataSize, cudfDataSchema, newReadDataSchema, isFirstChunk) + readToTable(dataBuffer, dataSize, cudfSchema, newReadDataSchema, isFirstChunk) } -// println(s"table has ${table.getNumberOfColumns} columns") maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) // parse boolean and numeric columns that were read as strings