From 1db50709394d4ae6f69ed69236a136f1ce6130a7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 17 Feb 2022 15:24:28 -0700 Subject: [PATCH] Improve JSON and CSV parsing of integer values (#4790) * Improve JSON and CSV support for boolean values Signed-off-by: Andy Grove * implement custom boolean parsing logic for CSV and JSON * Improve JSON and CSV parsing of integer values Signed-off-by: Andy Grove * Add more tests. Remove csv configs that are no longer needed Signed-off-by: Andy Grove * fix regression * remove duplicate code * add XFAIL test for JSON strings containing integers * Save * Save --- docs/configs.md | 7 --- integration_tests/src/main/python/csv_test.py | 20 ++++--- .../src/main/python/json_test.py | 16 +++++- .../src/test/resources/ints.json | 9 +++ .../src/test/resources/ints_invalid.json | 11 ++++ .../tests/mortgage/MortgageSparkSuite.scala | 6 -- .../spark/rapids/GpuBatchScanExec.scala | 35 ------------ .../rapids/GpuTextBasedPartitionReader.scala | 30 ++++++++-- .../com/nvidia/spark/rapids/RapidsConf.scala | 55 ------------------- .../com/nvidia/spark/rapids/TypeChecks.scala | 7 --- .../catalyst/json/rapids/GpuJsonScan.scala | 20 ++++++- .../nvidia/spark/rapids/CsvScanSuite.scala | 14 ++--- .../rapids/SparkQueryCompareTestSuite.scala | 6 -- .../main/resources/supportedDataSource.csv | 2 +- 14 files changed, 96 insertions(+), 142 deletions(-) create mode 100644 integration_tests/src/test/resources/ints.json create mode 100644 integration_tests/src/test/resources/ints_invalid.json diff --git a/docs/configs.md b/docs/configs.md index 8469f482041..a6e826a98aa 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -67,14 +67,7 @@ Name | Description | Default Value spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1 -spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false -spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false -spark.rapids.sql.csv.read.double.enabled|Parsing CSV double has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false -spark.rapids.sql.csv.read.float.enabled|Parsing CSV floats has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false -spark.rapids.sql.csv.read.integer.enabled|Parsing CSV integers is much more lenient and will return 0 for some malformed values instead of null|false -spark.rapids.sql.csv.read.long.enabled|Parsing CSV longs is much more lenient and will return 0 for some malformed values instead of null|false -spark.rapids.sql.csv.read.short.enabled|Parsing CSV shorts is much more lenient and will return 0 for some malformed values instead of null|false spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 49c16191333..67c5e7344b4 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -214,21 +214,25 @@ def read_impl(spark): pytest.param('trucks-missing-quotes.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')), pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true', 'nullValue': 'null'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2068')), pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}), + pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}), + pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}), + pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}), ('simple_int_values.csv', _float_schema, {'header': 'true'}), ('simple_int_values.csv', _double_schema, {'header': 'true'}), - pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}), pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}), pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}), pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _byte_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _short_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _int_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}), diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index a349e3b3e67..3e50eb0d73a 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -41,6 +41,18 @@ _bool_schema = StructType([ StructField('number', BooleanType())]) +_byte_schema = StructType([ + StructField('number', ByteType())]) + +_short_schema = StructType([ + StructField('number', ShortType())]) + +_int_schema = StructType([ + StructField('number', IntegerType())]) + +_long_schema = StructType([ + StructField('number', LongType())]) + _float_schema = StructType([ StructField('number', FloatType())]) @@ -175,6 +187,8 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena @pytest.mark.parametrize('filename', [ 'boolean.json', pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')), + 'ints.json', + pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')), 'nan_and_inf.json', pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), 'floats.json', @@ -182,7 +196,7 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena 'floats_invalid.json', pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), ]) -@pytest.mark.parametrize('schema', [_bool_schema, _float_schema, _double_schema]) +@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema]) @pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) @pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) @pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"]) diff --git a/integration_tests/src/test/resources/ints.json b/integration_tests/src/test/resources/ints.json new file mode 100644 index 00000000000..60f1b898fb7 --- /dev/null +++ b/integration_tests/src/test/resources/ints.json @@ -0,0 +1,9 @@ +{ "number": 0 } +{ "number": -128 } +{ "number": 127 } +{ "number": -32768 } +{ "number": 32767 } +{ "number": -2147483648 } +{ "number": 2147483647 } +{ "number": -9223372036854775808 } +{ "number": 9223372036854775807 } \ No newline at end of file diff --git a/integration_tests/src/test/resources/ints_invalid.json b/integration_tests/src/test/resources/ints_invalid.json new file mode 100644 index 00000000000..9ab9944d983 --- /dev/null +++ b/integration_tests/src/test/resources/ints_invalid.json @@ -0,0 +1,11 @@ +{ "number": true } +{ "number": 3.141 } +{ "number": "0" } +{ "number": "-128" } +{ "number": "127" } +{ "number": "-32768" } +{ "number": "32767" } +{ "number": "-2147483648" } +{ "number": "2147483647" } +{ "number": "-9223372036854775808" } +{ "number": "9223372036854775807" } \ No newline at end of file diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala index 145905df727..f1265e970d9 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala @@ -48,12 +48,6 @@ class MortgageSparkSuite extends FunSuite { .config("spark.rapids.sql.incompatibleOps.enabled", true) .config("spark.rapids.sql.hasNans", false) .config("spark.rapids.sql.csv.read.date.enabled", true) - .config("spark.rapids.sql.csv.read.byte.enabled", true) - .config("spark.rapids.sql.csv.read.short.enabled", true) - .config("spark.rapids.sql.csv.read.integer.enabled", true) - .config("spark.rapids.sql.csv.read.long.enabled", true) - .config("spark.rapids.sql.csv.read.float.enabled", true) - .config("spark.rapids.sql.csv.read.double.enabled", true) val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass val prop = System.getProperty("rapids.shuffle.manager.override", "false") if (prop.equalsIgnoreCase("true")) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index fe1cc939653..21c9d0147ac 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -222,41 +222,6 @@ object GpuCSVScan { } } - if (!meta.conf.isCsvBoolReadEnabled && readSchema.map(_.dataType).contains(BooleanType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading boolean. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BOOLS} to true.") - } - - if (!meta.conf.isCsvByteReadEnabled && readSchema.map(_.dataType).contains(ByteType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading bytes. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BYTES} to true.") - } - - if (!meta.conf.isCsvShortReadEnabled && readSchema.map(_.dataType).contains(ShortType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading shorts. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_SHORTS} to true.") - } - - if (!meta.conf.isCsvIntReadEnabled && readSchema.map(_.dataType).contains(IntegerType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading integers. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_INTEGERS} to true.") - } - - if (!meta.conf.isCsvLongReadEnabled && readSchema.map(_.dataType).contains(LongType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading longs. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_LONGS} to true.") - } - - if (!meta.conf.isCsvFloatReadEnabled && readSchema.map(_.dataType).contains(FloatType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading floats. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_FLOATS} to true.") - } - - if (!meta.conf.isCsvDoubleReadEnabled && readSchema.map(_.dataType).contains(DoubleType)) { - meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading doubles. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DOUBLES} to true.") - } - if (readSchema.map(_.dataType).contains(TimestampType)) { if (!meta.conf.isCsvTimestampReadEnabled) { meta.willNotWorkOnGpu("GpuCSVScan does not support parsing timestamp types. To " + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 4c522ee248d..7b079ee0618 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import scala.math.max -import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory @@ -166,11 +166,13 @@ abstract class GpuTextBasedPartitionReader( readDataSchema } - // read boolean and floating-point columns as strings in cuDF + // read boolean and numeric columns as strings in cuDF val dataSchemaWithStrings = StructType(dataSchema.fields .map(f => { f.dataType match { - case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.DoubleType => + case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | + DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | + DataTypes.DoubleType => f.copy(dataType = DataTypes.StringType) case _ => f @@ -188,7 +190,7 @@ abstract class GpuTextBasedPartitionReader( } maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - // parse boolean and floating-point columns that were read as strings + // parse boolean and numeric columns that were read as strings val castTable = withResource(table) { _ => val columns = new ListBuffer[ColumnVector]() // Table increases the ref counts on the columns so we have @@ -197,9 +199,17 @@ abstract class GpuTextBasedPartitionReader( // ansi mode does not apply to text inputs val ansiEnabled = false for (i <- 0 until table.getNumberOfColumns) { - val castColumn = dataSchema.fields(i).dataType match { + val castColumn = newReadDataSchema.fields(i).dataType match { case DataTypes.BooleanType => castStringToBool(table.getColumn(i)) + case DataTypes.ByteType => + castStringToInt(table.getColumn(i), DType.INT8) + case DataTypes.ShortType => + castStringToInt(table.getColumn(i), DType.INT16) + case DataTypes.IntegerType => + castStringToInt(table.getColumn(i), DType.INT32) + case DataTypes.LongType => + castStringToInt(table.getColumn(i), DType.INT64) case DataTypes.FloatType => GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) case DataTypes.DoubleType => @@ -222,6 +232,16 @@ abstract class GpuTextBasedPartitionReader( def castStringToBool(input: ColumnVector): ColumnVector + def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = { + withResource(input.isInteger(intType)) { isInt => + withResource(input.castTo(intType)) { asInt => + withResource(Scalar.fromNull(intType)) { nullValue => + isInt.ifElse(asInt, nullValue) + } + } + } + } + /** * Read the host buffer to GPU table * @param dataBuffer host buffer to be read diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 2afacb899e6..d403dbf653d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -890,47 +890,6 @@ object RapidsConf { .booleanConf .createWithDefault(false) - val ENABLE_READ_CSV_BOOLS = conf("spark.rapids.sql.csv.read.bool.enabled") - .doc("Parsing an invalid CSV boolean value produces true instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_BYTES = conf("spark.rapids.sql.csv.read.byte.enabled") - .doc("Parsing CSV bytes is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_SHORTS = conf("spark.rapids.sql.csv.read.short.enabled") - .doc("Parsing CSV shorts is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_INTEGERS = conf("spark.rapids.sql.csv.read.integer.enabled") - .doc("Parsing CSV integers is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_LONGS = conf("spark.rapids.sql.csv.read.long.enabled") - .doc("Parsing CSV longs is much more lenient and will return 0 for some " + - "malformed values instead of null") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_FLOATS = conf("spark.rapids.sql.csv.read.float.enabled") - .doc("Parsing CSV floats has some issues at the min and max values for floating" + - "point numbers and can be more lenient on parsing inf and -inf values") - .booleanConf - .createWithDefault(false) - - val ENABLE_READ_CSV_DOUBLES = conf("spark.rapids.sql.csv.read.double.enabled") - .doc("Parsing CSV double has some issues at the min and max values for floating" + - "point numbers and can be more lenient on parsing inf and -inf values") - .booleanConf - .createWithDefault(false) - val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled") .doc("When set to true enables all json input and output acceleration. " + "(only input is currently supported anyways)") @@ -1621,20 +1580,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCsvDateReadEnabled: Boolean = get(ENABLE_READ_CSV_DATES) - lazy val isCsvBoolReadEnabled: Boolean = get(ENABLE_READ_CSV_BOOLS) - - lazy val isCsvByteReadEnabled: Boolean = get(ENABLE_READ_CSV_BYTES) - - lazy val isCsvShortReadEnabled: Boolean = get(ENABLE_READ_CSV_SHORTS) - - lazy val isCsvIntReadEnabled: Boolean = get(ENABLE_READ_CSV_INTEGERS) - - lazy val isCsvLongReadEnabled: Boolean = get(ENABLE_READ_CSV_LONGS) - - lazy val isCsvFloatReadEnabled: Boolean = get(ENABLE_READ_CSV_FLOATS) - - lazy val isCsvDoubleReadEnabled: Boolean = get(ENABLE_READ_CSV_DOUBLES) - lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING) lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 74d119f09d6..0ebfc072568 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -2115,13 +2115,6 @@ object SupportedOpsForTools { val readOps = types.map { t => val typeEnabled = if (format.toString.toLowerCase.equals("csv")) { t.toString match { - case "BOOLEAN" => conf.isCsvBoolReadEnabled - case "BYTE" => conf.isCsvByteReadEnabled - case "SHORT" => conf.isCsvShortReadEnabled - case "INT" => conf.isCsvIntReadEnabled - case "LONG" => conf.isCsvLongReadEnabled - case "FLOAT" => conf.isCsvFloatReadEnabled - case "DOUBLE" => conf.isCsvDoubleReadEnabled case "TIMESTAMP" => conf.isCsvTimestampReadEnabled case "DATE" => conf.isCsvDateReadEnabled case _ => true diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 10ed3368371..795a7d8eeb3 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.json.rapids import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} @@ -40,7 +41,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration; +import org.apache.spark.util.SerializableConfiguration object GpuJsonScan { @@ -287,7 +288,22 @@ class JsonPartitionReader( hasHeader: Boolean): Table = { val jsonOpts = buildJsonOptions(parsedOptions) - Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize) + // cuDF does not yet support reading a subset of columns so we have + // to apply the read schema projection here + withResource(Table.readJSON(cudfSchema, jsonOpts, dataBuffer, 0, dataSize)) { tbl => + val columns = new ListBuffer[ColumnVector]() + closeOnExcept(columns) { _ => + for (name <- readDataSchema.fieldNames) { + val i = cudfSchema.getColumnNames.indexOf(name) + if (i == -1) { + throw new IllegalStateException( + s"read schema contains field named '$name' that is not in the data schema") + } + columns += tbl.getColumn(i) + } + } + new Table(columns: _*) + } } /** diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala index 7b9210c081f..e082e61354c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual("Test CSV splits with chunks", floatCsvDf, conf = new SparkConf() - .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") - .set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")) { + .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) { frame => frame.select(col("floats")) } @@ -40,8 +39,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { "Test CSV count chunked by rows", intsFromCsv, conf = new SparkConf() - .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { + .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) { frameCount } @@ -49,8 +47,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { "Test CSV count chunked by bytes", intsFromCsv, conf = new SparkConf() - .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "0") - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { + .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "0")) { frameCount } @@ -61,8 +58,7 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { intsFromCsvInferredSchema, Seq("FileSourceScanExec", "FilterExec", "CollectLimitExec", "GreaterThan", "Length", "StringTrim", "LocalTableScanExec", "DeserializeToObjectExec", "Invoke", "AttributeReference", "Literal"), - conf = new SparkConf() - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { + conf = new SparkConf()) { frame => frame.select(col("*")) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index b8357c9db15..ea0828a8d64 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -151,12 +151,6 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { def enableCsvConf(): SparkConf = { new SparkConf() .set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true") - .set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true") } // @see java.lang.Float#intBitsToFloat diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index 0ae3b52f898..4a42eede67c 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -1,5 +1,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT -CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA +CSV,read,S,S,S,S,S,S,S,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA JSON,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO,CO ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA