From 0144d7100af2f3645d913b67c9a2dead653e297e Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 16 Apr 2021 06:37:25 -0500 Subject: [PATCH] Disable CSV parsing by default and update tests to better show what is left (#2072) Signed-off-by: Robert (Bobby) Evans --- docs/compatibility.md | 57 +++++- docs/configs.md | 8 + integration_tests/src/main/python/csv_test.py | 175 +++++++++++++++--- integration_tests/src/test/resources/date.csv | 4 + .../src/test/resources/empty.csv | 0 .../src/test/resources/empty_int_values.csv | 7 + .../test/resources/ints_with_whitespace.csv | 16 ++ .../src/test/resources/just_comments.csv | 6 + .../src/test/resources/nan_and_inf.csv | 8 + .../src/test/resources/no-comments.csv | 3 + .../test/resources/simple_boolean_values.csv | 9 + .../test/resources/simple_float_values.csv | 27 +++ .../src/test/resources/simple_int_values.csv | 21 +++ .../src/test/resources/trucks-blank-names.csv | 6 + .../src/test/resources/trucks-comments.csv | 8 + .../src/test/resources/trucks-different.csv | 6 + .../test/resources/trucks-empty-values.csv | 7 + .../test/resources/trucks-extra-columns.csv | 7 + .../test/resources/trucks-missing-quotes.csv | 7 + .../test/resources/trucks-more-comments.csv | 8 + .../src/test/resources/trucks-null.csv | 7 + .../src/test/resources/trucks-windows.csv | 7 + .../src/test/resources/trucks.csv | 7 + .../src/test/resources/trucks.tsv | 5 + .../tests/mortgage/MortgageSparkSuite.scala | 7 + .../spark/rapids/GpuBatchScanExec.scala | 90 ++++++--- .../com/nvidia/spark/rapids/RapidsConf.scala | 81 +++++++- .../nvidia/spark/rapids/CsvScanSuite.scala | 20 +- .../rapids/GpuCoalesceBatchesSuite.scala | 2 +- .../spark/rapids/HashAggregatesSuite.scala | 127 ++++++++----- .../nvidia/spark/rapids/LimitExecSuite.scala | 4 +- .../spark/rapids/ProjectExprSuite.scala | 4 +- .../spark/rapids/ScalarSubquerySuite.scala | 1 + .../nvidia/spark/rapids/SortExecSuite.scala | 6 +- .../rapids/SparkQueryCompareTestSuite.scala | 11 ++ 35 files changed, 640 insertions(+), 129 deletions(-) create mode 100644 integration_tests/src/test/resources/date.csv create mode 100644 integration_tests/src/test/resources/empty.csv create mode 100644 integration_tests/src/test/resources/empty_int_values.csv create mode 100644 integration_tests/src/test/resources/ints_with_whitespace.csv create mode 100644 integration_tests/src/test/resources/just_comments.csv create mode 100644 integration_tests/src/test/resources/nan_and_inf.csv create mode 100644 integration_tests/src/test/resources/no-comments.csv create mode 100644 integration_tests/src/test/resources/simple_boolean_values.csv create mode 100644 integration_tests/src/test/resources/simple_float_values.csv create mode 100644 integration_tests/src/test/resources/simple_int_values.csv create mode 100644 integration_tests/src/test/resources/trucks-blank-names.csv create mode 100644 integration_tests/src/test/resources/trucks-comments.csv create mode 100644 integration_tests/src/test/resources/trucks-different.csv create mode 100644 integration_tests/src/test/resources/trucks-empty-values.csv create mode 100644 integration_tests/src/test/resources/trucks-extra-columns.csv create mode 100644 integration_tests/src/test/resources/trucks-missing-quotes.csv create mode 100644 integration_tests/src/test/resources/trucks-more-comments.csv create mode 100644 integration_tests/src/test/resources/trucks-null.csv create mode 100644 integration_tests/src/test/resources/trucks-windows.csv create mode 100644 integration_tests/src/test/resources/trucks.csv create mode 100644 integration_tests/src/test/resources/trucks.tsv diff --git a/docs/compatibility.md b/docs/compatibility.md index 8f97717d6da..c866c6e68dd 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -103,16 +103,41 @@ will produce a different result compared to the plugin. ## CSV Reading -Spark is very strict when reading CSV and if the data does not conform with the expected format -exactly it will result in a `null` value. The underlying parser that the SQL plugin uses is much -more lenient. If you have badly formatted CSV data you may get data back instead of nulls. If this -is a problem you can disable the CSV reader by setting the config -[`spark.rapids.sql.format.csv.read.enabled`](configs.md#sql.format.csv.read.enabled) to `false`. -Because the speed up is so large and the issues typically only show up in error conditions we felt -it was worth having the CSV reader enabled by default. +Due to inconsistencies between how CSV data is parsed CSV parsing is off by default. +Each data type can be enabled or disabled independently using the following configs. + + * [spark.rapids.sql.csv.read.bool.enabled](configs.md#sql.csv.read.bool.enabled) + * [spark.rapids.sql.csv.read.byte.enabled](configs.md#sql.csv.read.byte.enabled) + * [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled) + * [spark.rapids.sql.csv.read.double.enabled](configs.md#sql.csv.read.double.enabled) + * [spark.rapids.sql.csv.read.float.enabled](configs.md#sql.csv.read.float.enabled) + * [spark.rapids.sql.csv.read.integer.enabled](configs.md#sql.csv.read.integer.enabled) + * [spark.rapids.sql.csv.read.long.enabled](configs.md#sql.csv.read.long.enabled) + * [spark.rapids.sql.csv.read.short.enabled](configs.md#sql.csv.read.short.enabled) + * [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled) + +If you know that your particular data type will be parsed correctly enough, you may enable each +type you expect to use. Often the performance improvement is so good that it is worth +checking if it is parsed correctly. + +Spark is generally very strict when reading CSV and if the data does not conform with the +expected format exactly it will result in a `null` value. The underlying parser that the RAPIDS Accelerator +uses is much more lenient. If you have badly formatted CSV data you may get data back instead of +nulls. + +Spark allows for stripping leading and trailing white space using various options that are off by +default. The plugin will strip leading and trailing space for all values except strings. There are also discrepancies/issues with specific types that are detailed below. +### CSV Boolean + +Invalid values like `BAD` show up as `true` as described by this +[issue](https://github.com/NVIDIA/spark-rapids/issues/2071) + +This is the same for all other types, but because that is the only issue with boolean parsing +we have called it out specifically here. + ### CSV Strings Writing strings to a CSV file in general for Spark can be problematic unless you can ensure that your data does not have any line deliminators in it. The GPU accelerated CSV parser handles quoted @@ -140,7 +165,12 @@ Only a limited set of formats are supported when parsing dates. The reality is that all of these formats are supported at the same time. The plugin will only disable itself if you set a format that it does not support. -As a work around you can parse the column as a timestamp and then cast it to a date. +As a workaround you can parse the column as a timestamp and then cast it to a date. + +Invalid dates in Spark, values that have the correct format, but the numbers produce invalid dates, +can result in an exception by default, and how they are parsed can be controlled through a config. +The RAPIDS Accelerator does not support any of this and will produce an incorrect date. Typically, +one that overflowed. ### CSV Timestamps The CSV parser does not support time zones. It will ignore any trailing time zone information, @@ -163,9 +193,14 @@ portion followed by one of the following formats: Just like with dates all timestamp formats are actually supported at the same time. The plugin will disable itself if it sees a format it cannot support. +Invalid timestamps in Spark, ones that have the correct format, but the numbers produce invalid +dates or times, can result in an exception by default and how they are parsed can be controlled +through a config. The RAPIDS Accelerator does not support any of this and will produce an incorrect +date. Typically, one that overflowed. + ### CSV Floating Point -The CSV parser is not able to parse `Infinity`, `-Infinity`, or `NaN` values. All of these are +The CSV parser is not able to parse `NaN` values. These are likely to be turned into null values, as described in this [issue](https://github.com/NVIDIA/spark-rapids/issues/125). @@ -174,6 +209,10 @@ Some floating-point values also appear to overflow but do not for the CPU as des Any number that overflows will not be turned into a null value. +Also parsing of some values will not produce bit for bit identical results to what the CPU does. +They are within round-off errors except when they are close enough to overflow to Inf or -Inf which +then results in a number being returned when the CPU would have returned null. + ### CSV Integer Any number that overflows will not be turned into a null value. diff --git a/docs/configs.md b/docs/configs.md index 7b885774688..9253583dc4c 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -60,6 +60,14 @@ Name | Description | Default Value spark.rapids.sql.castStringToInteger.enabled|When set to true, enables casting from strings to integer types (byte, short, int, long) on the GPU. Casting from string to integer types on the GPU returns incorrect results when the string represents a number larger than Long.MaxValue or smaller than Long.MinValue.|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1 +spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false +spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false +spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false +spark.rapids.sql.csv.read.double.enabled|Parsing CSV double has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false +spark.rapids.sql.csv.read.float.enabled|Parsing CSV floats has some issues at the min and max values for floatingpoint numbers and can be more lenient on parsing inf and -inf values|false +spark.rapids.sql.csv.read.integer.enabled|Parsing CSV integers is much more lenient and will return 0 for some malformed values instead of null|false +spark.rapids.sql.csv.read.long.enabled|Parsing CSV longs is much more lenient and will return 0 for some malformed values instead of null|false +spark.rapids.sql.csv.read.short.enabled|Parsing CSV shorts is much more lenient and will return 0 for some malformed values instead of null|false spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false spark.rapids.sql.decimalType.enabled|Enable decimal type support on the GPU. Decimal support on the GPU is limited to less than 18 digits. This can result in a lot of data movement to and from the GPU, which can slow down processing in some cases.|false spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index c0af1aecfe6..6710f6ce46a 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -93,37 +93,150 @@ _good_str_schema = StructType([ StructField('Something', StringType())]) +_three_str_schema = StructType([ + StructField('a', StringType()), + StructField('b', StringType()), + StructField('c', StringType())]) -_enable_ts_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true'} +_trucks_schema = StructType([ + StructField('make', StringType()), + StructField('model', StringType()), + StructField('year', IntegerType()), + StructField('price', StringType()), + StructField('comment', StringType())]) -def read_csv_df(data_path, schema, header, sep): - return lambda spark : spark.read\ - .schema(schema)\ - .option('header', header)\ - .option('sep', sep)\ - .csv(data_path) +_bool_schema = StructType([ + StructField('boolean', BooleanType())]) -def read_csv_sql(data_path, schema, header, sep): +_byte_schema = StructType([ + StructField('number', ByteType())]) + +_short_schema = StructType([ + StructField('number', ShortType())]) + +_int_schema = StructType([ + StructField('number', IntegerType())]) + +_long_schema = StructType([ + StructField('number', LongType())]) + +_float_schema = StructType([ + StructField('number', FloatType())]) + +_double_schema = StructType([ + StructField('number', DoubleType())]) + +_number_as_string_schema = StructType([ + StructField('number', StringType())]) + +_empty_byte_schema = StructType([ + StructField('ignored_a', StringType()), + StructField('number', ByteType()), + StructField('ignored_b', StringType())]) + +_empty_short_schema = StructType([ + StructField('ignored_a', StringType()), + StructField('number', ShortType()), + StructField('ignored_b', StringType())]) + +_empty_int_schema = StructType([ + StructField('ignored_a', StringType()), + StructField('number', IntegerType()), + StructField('ignored_b', StringType())]) + +_empty_long_schema = StructType([ + StructField('ignored_a', StringType()), + StructField('number', LongType()), + StructField('ignored_b', StringType())]) + +_empty_float_schema = StructType([ + StructField('ignored_a', StringType()), + StructField('number', FloatType()), + StructField('ignored_b', StringType())]) + +_empty_double_schema = StructType([ + StructField('ignored_a', StringType()), + StructField('number', DoubleType()), + StructField('ignored_b', StringType())]) + +_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true', + 'spark.rapids.sql.csv.read.bool.enabled': 'true', + 'spark.rapids.sql.csv.read.date.enabled': 'true', + 'spark.rapids.sql.csv.read.byte.enabled': 'true', + 'spark.rapids.sql.csv.read.short.enabled': 'true', + 'spark.rapids.sql.csv.read.integer.enabled': 'true', + 'spark.rapids.sql.csv.read.long.enabled': 'true', + 'spark.rapids.sql.csv.read.float.enabled': 'true', + 'spark.rapids.sql.csv.read.double.enabled': 'true', + 'spark.sql.legacy.timeParserPolicy': 'Corrected'} + +def read_csv_df(data_path, schema, options = {}): + def read_impl(spark): + reader = spark.read + if not schema is None: + reader = reader.schema(schema) + for key, value in options.items(): + reader = reader.option(key, value) + return debug_df(reader.csv(data_path)) + return read_impl + +def read_csv_sql(data_path, schema, options = {}): + if not schema is None: + options.update({'schema': schema}) def read_impl(spark): spark.sql('DROP TABLE IF EXISTS `TMP_CSV_TABLE`') - return spark.catalog.createTable('TMP_CSV_TABLE', source='csv', schema=schema, header=str(header), sep=sep, path=data_path) + return spark.catalog.createTable('TMP_CSV_TABLE', source='csv', path=data_path, **options) return read_impl @approximate_float -@pytest.mark.parametrize('name,schema,sep,header', [ - ('Acquisition_2007Q3.txt', _acq_schema, '|', False), - ('Performance_2007Q3.txt_0', _perf_schema, '|', False), - pytest.param('ts.csv', _date_schema, ',', False, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/869')), - ('ts.csv', _ts_schema, ',', False), - ('str.csv', _bad_str_schema, ',', True), - ('str.csv', _good_str_schema, ',', True) - ]) +@pytest.mark.parametrize('name,schema,options', [ + ('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}), + ('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}), + pytest.param('ts.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1091')), + pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')), + ('ts.csv', _ts_schema, {}), + ('str.csv', _bad_str_schema, {'header': 'true'}), + ('str.csv', _good_str_schema, {'header': 'true'}), + ('no-comments.csv', _three_str_schema, {}), + ('empty.csv', _three_str_schema, {}), + ('just_comments.csv', _three_str_schema, {'comment': '#'}), + ('trucks.csv', _trucks_schema, {'header': 'true'}), + ('trucks.tsv', _trucks_schema, {'sep': '\t', 'header': 'true'}), + ('trucks-different.csv', _trucks_schema, {'sep': '|', 'header': 'true', 'quote': "'"}), + ('trucks-blank-names.csv', _trucks_schema, {'header': 'true'}), + ('trucks-windows.csv', _trucks_schema, {'header': 'true'}), + ('trucks-empty-values.csv', _trucks_schema, {'header': 'true'}), + ('trucks-extra-columns.csv', _trucks_schema, {'header': 'true'}), + pytest.param('trucks-comments.csv', _trucks_schema, {'header': 'true', 'comment': '~'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2066')), + ('trucks-more-comments.csv', _trucks_schema, {'header': 'true', 'comment': '#'}), + pytest.param('trucks-missing-quotes.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')), + pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true', 'nullValue': 'null'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2068')), + pytest.param('trucks-null.csv', _trucks_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('simple_int_values.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_int_values.csv', _short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_int_values.csv', _int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')), + ('simple_int_values.csv', _float_schema, {'header': 'true'}), + ('simple_int_values.csv', _double_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), + pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125i, https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125, https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')), + pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), + pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) + ], ids=idfn) @pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql]) @pytest.mark.parametrize('v1_enabled_list', ["", "csv"]) -def test_basic_read(std_input_path, name, schema, sep, header, read_func, v1_enabled_list): - updated_conf=_enable_ts_conf +def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled_list): + updated_conf=_enable_all_types_conf.copy() updated_conf['spark.sql.sources.useV1SourceList']=v1_enabled_list - assert_gpu_and_cpu_are_equal_collect(read_func(std_input_path + '/' + name, schema, header, sep), + assert_gpu_and_cpu_are_equal_collect(read_func(std_input_path + '/' + name, schema, options), conf=updated_conf) csv_supported_gens = [ @@ -147,7 +260,7 @@ def test_round_trip(spark_tmp_path, data_gen, v1_enabled_list): gen = StructGen([('a', data_gen)], nullable=False) data_path = spark_tmp_path + '/CSV_DATA' schema = gen.data_type - updated_conf=_enable_ts_conf + updated_conf=_enable_all_types_conf.copy() updated_conf['spark.sql.sources.useV1SourceList']=v1_enabled_list with_cpu_session( lambda spark : gen_df(spark, gen).write.csv(data_path)) @@ -167,14 +280,16 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf): gen = StructGen(gen_list, nullable=False) data_path = spark_tmp_path + '/CSV_DATA' schema = gen.data_type - reader = read_func(data_path, schema, False, ',') + updated_conf=_enable_all_types_conf.copy() + updated_conf[disable_conf]='false' + + reader = read_func(data_path, schema) with_cpu_session( lambda spark : gen_df(spark, gen).write.csv(data_path)) assert_gpu_fallback_collect( lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')), 'FileSourceScanExec', - conf={disable_conf: 'false', - "spark.sql.sources.useV1SourceList": "csv"}) + conf=updated_conf) csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM', 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy'] @@ -184,6 +299,8 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list): gen = StructGen([('a', DateGen())], nullable=False) data_path = spark_tmp_path + '/CSV_DATA' schema = gen.data_type + updated_conf=_enable_all_types_conf.copy() + updated_conf['spark.sql.sources.useV1SourceList']=v1_enabled_list with_cpu_session( lambda spark : gen_df(spark, gen).write\ .option('dateFormat', date_format)\ @@ -193,7 +310,7 @@ def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list): .schema(schema)\ .option('dateFormat', date_format)\ .csv(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=updated_conf) csv_supported_ts_parts = ['', # Just the date "'T'HH:mm:ss.SSSXXX", @@ -217,7 +334,7 @@ def test_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_enabled_ lambda spark : gen_df(spark, gen).write\ .option('timestampFormat', full_format)\ .csv(data_path)) - updated_conf=_enable_ts_conf + updated_conf=_enable_all_types_conf.copy() updated_conf['spark.sql.sources.useV1SourceList']=v1_enabled_list assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read\ @@ -236,6 +353,8 @@ def test_input_meta(spark_tmp_path, v1_enabled_list): with_cpu_session( lambda spark : gen_df(spark, gen).write.csv(second_data_path)) data_path = spark_tmp_path + '/CSV_DATA' + updated_conf=_enable_all_types_conf.copy() + updated_conf['spark.sql.sources.useV1SourceList']=v1_enabled_list assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.schema(gen.data_type)\ .csv(data_path)\ @@ -244,7 +363,7 @@ def test_input_meta(spark_tmp_path, v1_enabled_list): 'input_file_name()', 'input_file_block_start()', 'input_file_block_length()'), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=updated_conf) @allow_non_gpu('DataWritingCommandExec') def test_csv_save_as_table_fallback(spark_tmp_path, spark_tmp_table_factory): diff --git a/integration_tests/src/test/resources/date.csv b/integration_tests/src/test/resources/date.csv new file mode 100644 index 00000000000..300b64dcc01 --- /dev/null +++ b/integration_tests/src/test/resources/date.csv @@ -0,0 +1,4 @@ +2020-09-16 +2020-10-16 +2021-09-16 +2020-50-16 diff --git a/integration_tests/src/test/resources/empty.csv b/integration_tests/src/test/resources/empty.csv new file mode 100644 index 00000000000..e69de29bb2d diff --git a/integration_tests/src/test/resources/empty_int_values.csv b/integration_tests/src/test/resources/empty_int_values.csv new file mode 100644 index 00000000000..5e1b2697b5b --- /dev/null +++ b/integration_tests/src/test/resources/empty_int_values.csv @@ -0,0 +1,7 @@ +empty-0space,,end +empty-0space-quoted,"",end +empty-1space, ,end +empty-1space-quoted," ",end +empty-2space, ,end +empty-2space-quoted," ",end +no-null,3,end diff --git a/integration_tests/src/test/resources/ints_with_whitespace.csv b/integration_tests/src/test/resources/ints_with_whitespace.csv new file mode 100644 index 00000000000..df2c30355fe --- /dev/null +++ b/integration_tests/src/test/resources/ints_with_whitespace.csv @@ -0,0 +1,16 @@ +"number" +1 + 2 + 3, +4 +5 , +"6" + "7" +"8" +"9" , + 10 + 11 + 12 + "13" , + "14" , +15 diff --git a/integration_tests/src/test/resources/just_comments.csv b/integration_tests/src/test/resources/just_comments.csv new file mode 100644 index 00000000000..e4f9f28f42c --- /dev/null +++ b/integration_tests/src/test/resources/just_comments.csv @@ -0,0 +1,6 @@ +#This,should,look,like,nothing +#COMMENT + +# More Comments + + diff --git a/integration_tests/src/test/resources/nan_and_inf.csv b/integration_tests/src/test/resources/nan_and_inf.csv new file mode 100644 index 00000000000..b2f8f78e751 --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf.csv @@ -0,0 +1,8 @@ +"number" +NaN +Inf +-Inf +NAN +nan +INF +-INF diff --git a/integration_tests/src/test/resources/no-comments.csv b/integration_tests/src/test/resources/no-comments.csv new file mode 100644 index 00000000000..e5dcec35cb1 --- /dev/null +++ b/integration_tests/src/test/resources/no-comments.csv @@ -0,0 +1,3 @@ +~1,2,3 +#4,5,6 +7,8,9 diff --git a/integration_tests/src/test/resources/simple_boolean_values.csv b/integration_tests/src/test/resources/simple_boolean_values.csv new file mode 100644 index 00000000000..4b8ec5a4e16 --- /dev/null +++ b/integration_tests/src/test/resources/simple_boolean_values.csv @@ -0,0 +1,9 @@ +"booleans" +true +false + +True +False +TRUE +FALSE +BAD diff --git a/integration_tests/src/test/resources/simple_float_values.csv b/integration_tests/src/test/resources/simple_float_values.csv new file mode 100644 index 00000000000..f7a20131283 --- /dev/null +++ b/integration_tests/src/test/resources/simple_float_values.csv @@ -0,0 +1,27 @@ +"number" +1.042 +-- +bad +"" +, +98.343 +223823.9484 +23848545.0374 +184721.23987223 +3.4028235e+38 +-0.0 +3.4028235E38 +3.4028236E38 +3.4028236e+38 +1.7976931348623157E308 +1.7976931348623157e+308 +1.7976931348623158E308 +1.2e-234 +NAN +nan +NaN +Inf +-Inf +INF +-INF + diff --git a/integration_tests/src/test/resources/simple_int_values.csv b/integration_tests/src/test/resources/simple_int_values.csv new file mode 100644 index 00000000000..1d515dfd2b2 --- /dev/null +++ b/integration_tests/src/test/resources/simple_int_values.csv @@ -0,0 +1,21 @@ +"number" +-1 +0 +1 +127 +128 +-128 +-129 +32767 +-32768 +32768 +-32769 +2147483647 +-2147483648 +2147483648 +-2147483649 +9223372036854775807 +-9223372036854775808 +9223372036854775808 +-9223372036854775809 +18446744073709551615 diff --git a/integration_tests/src/test/resources/trucks-blank-names.csv b/integration_tests/src/test/resources/trucks-blank-names.csv new file mode 100644 index 00000000000..ac438186482 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-blank-names.csv @@ -0,0 +1,6 @@ +"",,year,price,comment,extra +"Ford","F-150","2020","30,135.15","Luxury/Fullsize" + +GMC,Sierra 1500,1997,20000,"No comment", +Chevy,D-Max,2015,"5,000.10" + diff --git a/integration_tests/src/test/resources/trucks-comments.csv b/integration_tests/src/test/resources/trucks-comments.csv new file mode 100644 index 00000000000..de5671525da --- /dev/null +++ b/integration_tests/src/test/resources/trucks-comments.csv @@ -0,0 +1,8 @@ +~ Some metadata +~Comment at the start of the line +make,model,year,price,comment,extra +"Ford","F-150","2020","30,135.15","Luxury/Fullsize" + +~GMC,Sierra 1500,1997,20000,"No comment", +Chevy,D-Max,2015,"5,000.10" +~ Last comment line diff --git a/integration_tests/src/test/resources/trucks-different.csv b/integration_tests/src/test/resources/trucks-different.csv new file mode 100644 index 00000000000..07544634428 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-different.csv @@ -0,0 +1,6 @@ +make|model|year|price|comment|extra +'Ford'|'F-150'|'2020'|'30,135.15'|'Luxury/Fullsize' + +GMC|Sierra 1500|1997|20000|'No comment'| +Chevy|D-Max|2015|5,000.10 + diff --git a/integration_tests/src/test/resources/trucks-empty-values.csv b/integration_tests/src/test/resources/trucks-empty-values.csv new file mode 100644 index 00000000000..4de1b14aa79 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-empty-values.csv @@ -0,0 +1,7 @@ + +make,model,year,price,comment,extra +"Ford","","2020",,"Luxury/Fullsize" + +GMC,Sierra 1500,1997,20000,"No comment", +Chevy,D-Max,2015,,"" + diff --git a/integration_tests/src/test/resources/trucks-extra-columns.csv b/integration_tests/src/test/resources/trucks-extra-columns.csv new file mode 100644 index 00000000000..162cbb543e5 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-extra-columns.csv @@ -0,0 +1,7 @@ + +make,model,year,price,comment,extra +"Ford","F-150","2020","30,135.15","Luxury/Fullsize",,,,,, + +GMC,Sierra 1500,1997,20000,"No comment",,, +Chevy,D-Max,2015,"5,000.10",,,,,,, + diff --git a/integration_tests/src/test/resources/trucks-missing-quotes.csv b/integration_tests/src/test/resources/trucks-missing-quotes.csv new file mode 100644 index 00000000000..ef9b7bf86ff --- /dev/null +++ b/integration_tests/src/test/resources/trucks-missing-quotes.csv @@ -0,0 +1,7 @@ + +make,model,year,price,comment,extra +"Ford,F-150,2020,30,135.15,Luxury/Fullsize + +GMC,Sierra 1500,1997,20000,No comment", +"Chevy,"D-Max",2015,5,000.10 + diff --git a/integration_tests/src/test/resources/trucks-more-comments.csv b/integration_tests/src/test/resources/trucks-more-comments.csv new file mode 100644 index 00000000000..d9fc7f6bca8 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-more-comments.csv @@ -0,0 +1,8 @@ +# Some metadata +#Comment at the start of the line +make,model,year,price,comment,extra +"Ford","F-150","2020","30,135.15","Luxury/Fullsize" +#GMC,Sierra 1500,1997,20000,"No comment", + +Chevy,D-Max,2015,"5,000.10", +# Last comment line diff --git a/integration_tests/src/test/resources/trucks-null.csv b/integration_tests/src/test/resources/trucks-null.csv new file mode 100644 index 00000000000..32db5ccd5c3 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-null.csv @@ -0,0 +1,7 @@ + +make,model,year,price,comment,extra +"Ford",null,"2020","30,135.15",null + +GMC,Sierra 1500,1997,20000,"No comment", +Chevy,D-Max,null,"5,000.10" + diff --git a/integration_tests/src/test/resources/trucks-windows.csv b/integration_tests/src/test/resources/trucks-windows.csv new file mode 100644 index 00000000000..be087f56145 --- /dev/null +++ b/integration_tests/src/test/resources/trucks-windows.csv @@ -0,0 +1,7 @@ + +make,model,year,price,comment,extra +"Ford","F-150","2020","30,135.15","Luxury/Fullsize" + +GMC,Sierra 1500,1997,20000,"No comment", +Chevy,D-Max,2015,"5,000.10" + diff --git a/integration_tests/src/test/resources/trucks.csv b/integration_tests/src/test/resources/trucks.csv new file mode 100644 index 00000000000..76dc01e9736 --- /dev/null +++ b/integration_tests/src/test/resources/trucks.csv @@ -0,0 +1,7 @@ + +make,model,year,price,comment,extra +"Ford","F-150","2020","30,135.15","Luxury/Fullsize" + +GMC,Sierra 1500,1997,20000,"No comment", +Chevy,D-Max,2015,"5,000.10" + diff --git a/integration_tests/src/test/resources/trucks.tsv b/integration_tests/src/test/resources/trucks.tsv new file mode 100644 index 00000000000..ccbb54154d8 --- /dev/null +++ b/integration_tests/src/test/resources/trucks.tsv @@ -0,0 +1,5 @@ +make model year price comment extra +"Ford" "F-150" "2020" "30,135.15" "Luxury/Fullsize" +GMC Sierra 1500 1997 20000 "No comment" +Chevy D-Max 2015 5,000.10 + diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala index 877d8c20187..25ecf072904 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/mortgage/MortgageSparkSuite.scala @@ -42,6 +42,13 @@ class MortgageSparkSuite extends FunSuite { .config("spark.rapids.sql.test.enabled", false) .config("spark.rapids.sql.incompatibleOps.enabled", true) .config("spark.rapids.sql.hasNans", false) + .config("spark.rapids.sql.csv.read.date.enabled", true) + .config("spark.rapids.sql.csv.read.byte.enabled", true) + .config("spark.rapids.sql.csv.read.short.enabled", true) + .config("spark.rapids.sql.csv.read.integer.enabled", true) + .config("spark.rapids.sql.csv.read.long.enabled", true) + .config("spark.rapids.sql.csv.read.float.enabled", true) + .config("spark.rapids.sql.csv.read.double.enabled", true) val rapidsShuffle = ShimLoader.getSparkShims.getRapidsShuffleManagerClass val prop = System.getProperty("rapids.shuffle.manager.override", "false") if (prop.equalsIgnoreCase("true")) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index e8be113c6d8..d13fcf7fbe7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DateType, DecimalType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -173,46 +173,94 @@ object GpuCSVScan { meta.willNotWorkOnGpu("GpuCSVScan does not support modified escape chars") } - // TODO charToEscapeQuoteEscaping??? + if (parsedOptions.charToEscapeQuoteEscaping.isDefined) { + meta.willNotWorkOnGpu("GPU CSV Parsing does not support charToEscapeQuoteEscaping") + } if (StandardCharsets.UTF_8.name() != parsedOptions.charset && StandardCharsets.US_ASCII.name() != parsedOptions.charset) { meta.willNotWorkOnGpu("GpuCSVScan only supports UTF8 encoded data") } - if (parsedOptions.ignoreLeadingWhiteSpaceInRead) { - // TODO need to fix this (or at least verify that it is doing the right thing) - meta.willNotWorkOnGpu("GpuCSVScan does not support ignoring leading white space") + // TODO parsedOptions.ignoreLeadingWhiteSpaceInRead cudf always does this, but not for strings + // TODO parsedOptions.ignoreTrailingWhiteSpaceInRead cudf always does this, but not for strings + // TODO parsedOptions.multiLine cudf always does this, but it is not the default and it is not + // consistent + + if (parsedOptions.lineSeparator.getOrElse("\n") != "\n") { + meta.willNotWorkOnGpu("GpuCSVScan only supports \"\\n\" as a line separator") } - if (parsedOptions.ignoreTrailingWhiteSpaceInRead) { - // TODO need to fix this (or at least verify that it is doing the right thing) - meta.willNotWorkOnGpu("GpuCSVScan does not support ignoring trailing white space") + if (parsedOptions.parseMode != PermissiveMode) { + meta.willNotWorkOnGpu("GpuCSVScan only supports Permissive CSV parsing") } - if (parsedOptions.multiLine) { - meta.willNotWorkOnGpu("GpuCSVScan does not support multi-line") + // TODO parsedOptions.nanValue This is here by default so we should support it, but cudf + // make it null https://github.com/NVIDIA/spark-rapids/issues/125 + parsedOptions.positiveInf.toLowerCase() match { + case "inf" | "+inf" | "infinity" | "+infinity" => + case _ => + meta.willNotWorkOnGpu(s"the positive infinity value '${parsedOptions.positiveInf}'" + + s" is not supported'") } + parsedOptions.negativeInf.toLowerCase() match { + case "-inf" | "-infinity" => + case _ => + meta.willNotWorkOnGpu(s"the positive infinity value '${parsedOptions.positiveInf}'" + + s" is not supported'") + } + // parsedOptions.maxCharsPerColumn does not impact the final output it is a performance + // improvement if you know the maximum size - if (parsedOptions.lineSeparator.getOrElse("\n") != "\n") { - meta.willNotWorkOnGpu("GpuCSVScan only supports \"\\n\" as a line separator") + // parsedOptions.maxColumns was originally a performance optimization but is not used any more + + if (readSchema.map(_.dataType).contains(DateType)) { + if (!meta.conf.isCsvDateReadEnabled) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading dates. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DATES} to true.") + } + if (!supportedDateFormats.contains(parsedOptions.dateFormat)) { + meta.willNotWorkOnGpu(s"the date format '${parsedOptions.dateFormat}' is not supported'") + } } - if (parsedOptions.parseMode != PermissiveMode) { - meta.willNotWorkOnGpu("GpuCSVScan only supports Permissive CSV parsing") + if (!meta.conf.isCsvBoolReadEnabled && readSchema.map(_.dataType).contains(BooleanType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading boolean. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BOOLS} to true.") + } + + if (!meta.conf.isCsvByteReadEnabled && readSchema.map(_.dataType).contains(ByteType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading bytes. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_BYTES} to true.") + } + + if (!meta.conf.isCsvShortReadEnabled && readSchema.map(_.dataType).contains(ShortType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading shorts. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_SHORTS} to true.") } - // TODO parsedOptions.nanValue This is here by default so we need to be able to support it - // TODO parsedOptions.positiveInf This is here by default so we need to be able to support it - // TODO parsedOptions.negativeInf This is here by default so we need to be able to support it + if (!meta.conf.isCsvIntReadEnabled && readSchema.map(_.dataType).contains(IntegerType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading integers. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_INTEGERS} to true.") + } + + if (!meta.conf.isCsvLongReadEnabled && readSchema.map(_.dataType).contains(LongType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading longs. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_LONGS} to true.") + } + + if (!meta.conf.isCsvFloatReadEnabled && readSchema.map(_.dataType).contains(FloatType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading floats. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_FLOATS} to true.") + } - if (readSchema.map(_.dataType).contains(DateType) && - !supportedDateFormats.contains(parsedOptions.dateFormat)) { - meta.willNotWorkOnGpu(s"the date format '${parsedOptions.dateFormat}' is not supported'") + if (!meta.conf.isCsvDoubleReadEnabled && readSchema.map(_.dataType).contains(DoubleType)) { + meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading doubles. " + + s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DOUBLES} to true.") } if (readSchema.map(_.dataType).contains(TimestampType)) { - if (!meta.conf.isCsvTimestampEnabled) { + if (!meta.conf.isCsvTimestampReadEnabled) { meta.willNotWorkOnGpu("GpuCSVScan does not support parsing timestamp types. To " + s"enable it please set ${RapidsConf.ENABLE_CSV_TIMESTAMPS} to true.") } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 753dc8c4185..7ac472d1ee4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -607,13 +607,6 @@ object RapidsConf { .booleanConf .createWithDefault(false) - val ENABLE_CSV_TIMESTAMPS = conf("spark.rapids.sql.csvTimestamps.enabled") - .doc("When set to true, enables the CSV parser to read timestamps. The default output " + - "format for Spark includes a timezone at the end. Anything except the UTC timezone is not " + - "supported. Timestamps after 2038 and before 1902 are also not supported.") - .booleanConf - .createWithDefault(false) - // FILE FORMATS val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled") .doc("When set to false disables all parquet input and output acceleration") @@ -716,6 +709,60 @@ object RapidsConf { .booleanConf .createWithDefault(true) + // TODO should we change this config? + val ENABLE_CSV_TIMESTAMPS = conf("spark.rapids.sql.csvTimestamps.enabled") + .doc("When set to true, enables the CSV parser to read timestamps. The default output " + + "format for Spark includes a timezone at the end. Anything except the UTC timezone is " + + "not supported. Timestamps after 2038 and before 1902 are also not supported.") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_DATES = conf("spark.rapids.sql.csv.read.date.enabled") + .doc("Parsing invalid CSV dates produces different results from Spark") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_BOOLS = conf("spark.rapids.sql.csv.read.bool.enabled") + .doc("Parsing an invalid CSV boolean value produces true instead of null") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_BYTES = conf("spark.rapids.sql.csv.read.byte.enabled") + .doc("Parsing CSV bytes is much more lenient and will return 0 for some " + + "malformed values instead of null") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_SHORTS = conf("spark.rapids.sql.csv.read.short.enabled") + .doc("Parsing CSV shorts is much more lenient and will return 0 for some " + + "malformed values instead of null") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_INTEGERS = conf("spark.rapids.sql.csv.read.integer.enabled") + .doc("Parsing CSV integers is much more lenient and will return 0 for some " + + "malformed values instead of null") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_LONGS = conf("spark.rapids.sql.csv.read.long.enabled") + .doc("Parsing CSV longs is much more lenient and will return 0 for some " + + "malformed values instead of null") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_FLOATS = conf("spark.rapids.sql.csv.read.float.enabled") + .doc("Parsing CSV floats has some issues at the min and max values for floating" + + "point numbers and can be more lenient on parsing inf and -inf values") + .booleanConf + .createWithDefault(false) + + val ENABLE_READ_CSV_DOUBLES = conf("spark.rapids.sql.csv.read.double.enabled") + .doc("Parsing CSV double has some issues at the min and max values for floating" + + "point numbers and can be more lenient on parsing inf and -inf values") + .booleanConf + .createWithDefault(false) + // INTERNAL TEST AND DEBUG CONFIGS val TEST_CONF = conf("spark.rapids.sql.test.enabled") @@ -1215,9 +1262,25 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCastFloatToIntegralTypesEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES) - lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING) + lazy val isCsvTimestampReadEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) + + lazy val isCsvDateReadEnabled: Boolean = get(ENABLE_READ_CSV_DATES) + + lazy val isCsvBoolReadEnabled: Boolean = get(ENABLE_READ_CSV_BOOLS) + + lazy val isCsvByteReadEnabled: Boolean = get(ENABLE_READ_CSV_BYTES) + + lazy val isCsvShortReadEnabled: Boolean = get(ENABLE_READ_CSV_SHORTS) - lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) + lazy val isCsvIntReadEnabled: Boolean = get(ENABLE_READ_CSV_INTEGERS) + + lazy val isCsvLongReadEnabled: Boolean = get(ENABLE_READ_CSV_LONGS) + + lazy val isCsvFloatReadEnabled: Boolean = get(ENABLE_READ_CSV_FLOATS) + + lazy val isCsvDoubleReadEnabled: Boolean = get(ENABLE_READ_CSV_DOUBLES) + + lazy val isCastDecimalToStringEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_STRING) lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala index ce9f17349ad..b1014c4c3a9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CsvScanSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,22 +26,28 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { frame => frame.select(col("c_string"), col("c_int"), col("c_timestamp")) } - testSparkResultsAreEqual("Test CSV splits with chunks", floatCsvDf, conf= new SparkConf().set( - RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) { + testSparkResultsAreEqual("Test CSV splits with chunks", floatCsvDf, + conf = new SparkConf() + .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") + .set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")) { frame => frame.select(col("floats")) } testSparkResultsAreEqual( "Test CSV count chunked by rows", intsFromCsv, - conf=new SparkConf().set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) { + conf = new SparkConf() + .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") + .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { frameCount } testSparkResultsAreEqual( "Test CSV count chunked by bytes", intsFromCsv, - conf=new SparkConf().set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "0")) { + conf = new SparkConf() + .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "0") + .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { frameCount } @@ -51,7 +57,9 @@ class CsvScanSuite extends SparkQueryCompareTestSuite { ALLOW_NON_GPU_testSparkResultsAreEqual("Test CSV inferred schema", intsFromCsvInferredSchema, Seq("FileSourceScanExec", "FilterExec", "CollectLimitExec", "GreaterThan", "Length", "StringTrim", "LocalTableScanExec", "DeserializeToObjectExec", - "Invoke", "AttributeReference", "Literal")) { + "Invoke", "AttributeReference", "Literal"), + conf = new SparkConf() + .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")) { frame => frame.select(col("*")) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index ec5a6f7755a..230f5ac9323 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -64,7 +64,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { test("require single batch") { - val conf = makeBatchedBytes(1) + val conf = makeBatchedBytes(1, enableCsvConf()) .set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1") .set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "1") .set(RapidsConf.GPU_BATCH_SIZE_BYTES.key, "1") diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala index 43a70d10859..bd44f425891 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,8 +29,9 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DataTypes} class HashAggregatesSuite extends SparkQueryCompareTestSuite { - private val floatAggConf: SparkConf = new SparkConf().set( - RapidsConf.ENABLE_FLOAT_AGG.key, "true").set(RapidsConf.HAS_NANS.key, "false") + private val floatAggConf: SparkConf = enableCsvConf() + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true") + .set(RapidsConf.HAS_NANS.key, "false") def replaceHashAggMode(mode: String, conf: SparkConf = new SparkConf()): SparkConf = { // configures whether Plugin will replace certain aggregate exec nodes @@ -203,6 +204,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { IGNORE_ORDER_testSparkResultsAreEqual( "test hash agg with shuffle", longsFromCSVDf, + conf = enableCsvConf(), repart = 2) { frame => frame.groupBy(col("longs")).agg(sum(col("more_longs"))) } @@ -211,7 +213,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "test hash agg with Single partitioning", longsFromCSVDf, repart = 2, - conf = new SparkConf().set("spark.sql.shuffle.partitions", "1")) { + conf = enableCsvConf().set("spark.sql.shuffle.partitions", "1")) { frame => { frame.agg(count("*")) } @@ -221,7 +223,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "test hash agg with Single partitioning with partition sort", longsFromCSVDf, repart = 2, - conf = new SparkConf().set("spark.sql.shuffle.partitions", "1"), sortBeforeRepart = true) { + conf = enableCsvConf().set("spark.sql.shuffle.partitions", "1"), sortBeforeRepart = true) { frame => { frame.agg(count("*")) } @@ -247,7 +249,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "reduction aggs", longsCsvDf, // All the literals get turned into doubles, so we need to support avg in those cases - conf = makeBatchedBytes(3).set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { + conf = makeBatchedBytes(3, enableCsvConf()) + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.agg( (max("longs") - min("more_longs")) * lit(5), sum("longs"), @@ -256,7 +259,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { avg(col("more_longs") * lit("10"))) } - IGNORE_ORDER_testSparkResultsAreEqual("distinct", datesCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("distinct", datesCsvDf, conf = enableCsvConf()) { frame => frame.distinct() } @@ -338,7 +341,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { frame => frame.agg(avg(lit(null)),avg(lit(null))) } - IGNORE_ORDER_testSparkResultsAreEqual("distinct should not reorder columns", intsFromCsv) { + IGNORE_ORDER_testSparkResultsAreEqual("distinct should not reorder columns", + intsFromCsv, conf = enableCsvConf()) { frame => frame.distinct() } @@ -402,6 +406,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { IGNORE_ORDER_testSparkResultsAreEqual( "test count, sum, max, min with shuffle", longsFromCSVDf, + conf = enableCsvConf(), repart = 2) { frame => frame.groupBy(col("more_longs")).agg( count("*"), @@ -412,7 +417,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "float basic aggregates group by string literal", - floatCsvDf) { + floatCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(lit("2019-02-10")).agg( min(col("floats")) + lit(123), sum(col("more_floats") + lit(123.0)), @@ -426,7 +432,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "float basic aggregates group by float and string literal", - floatCsvDf) { + floatCsvDf, + conf = enableCsvConf()) { frame => { val feature_window_end_time = date_format(lit("2018-02-01"), "yyyy-MM-dd HH:mm:ss") val frameWithCol = frame.withColumn("timestamp_lit", feature_window_end_time) @@ -456,7 +463,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { } } - FLOAT_TEST_testSparkResultsAreEqual("float basic aggregates group by floats", floatCsvDf) { + FLOAT_TEST_testSparkResultsAreEqual("float basic aggregates group by floats", floatCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy("floats").agg( lit(456f), min(col("floats")) + lit(123), @@ -469,7 +477,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { count("*")) } - FLOAT_TEST_testSparkResultsAreEqual("float basic aggregates group by more_floats", floatCsvDf) { + FLOAT_TEST_testSparkResultsAreEqual("float basic aggregates group by more_floats", floatCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy("floats").agg( lit(456f), min(col("floats")) + lit(123), @@ -485,7 +494,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "partial on gpu: float basic aggregates group by more_floats", floatCsvDf, - conf = replaceHashAggMode("partial"), + conf = replaceHashAggMode("partial", enableCsvConf()), execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Literal", "Min", "Sum", "Max", "Average", "Add", "Multiply", "Subtract", "Cast", "Count")) { @@ -504,7 +513,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "final on gpu: float basic aggregates group by more_floats", floatCsvDf, - conf = replaceHashAggMode("final"), + conf = replaceHashAggMode("final", enableCsvConf()), execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Literal", "Min", "Sum", "Max", "Average", "Add", "Multiply", "Subtract", "Cast", "Count", "KnownFloatingPointNormalized", "NormalizeNaNAndZero")) { @@ -523,7 +532,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "nullable float basic aggregates group by more_floats", nullableFloatCsvDf, - conf = makeBatchedBytes(3)) { + conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("more_floats").agg( lit(456f), min(col("floats")) + lit(123), @@ -540,7 +549,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "shorts basic aggregates group by more_shorts", shortsFromCsv, // All the literals get turned into doubles, so we need to support avg in those cases - conf = makeBatchedBytes(3).set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { + conf = makeBatchedBytes(3, enableCsvConf()) + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy("more_shorts").agg( lit(456), min(col("shorts")) + lit(123), @@ -557,7 +567,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "long basic aggregates group by longs", longsCsvDf, // All the literals get turned into doubles, so we need to support avg in those cases - conf = makeBatchedBytes(3).set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { + conf = makeBatchedBytes(3, enableCsvConf()) + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy("longs").agg( lit(456f), min(col("longs")) + lit(123), @@ -574,7 +585,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "long basic aggregates group by more_longs", longsCsvDf, // All the literals get turned into doubles, so we need to support avg in those cases - conf = makeBatchedBytes(3).set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { + conf = makeBatchedBytes(3, enableCsvConf()) + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy("more_longs").agg( lit(456f), min(col("longs")) + lit(123), @@ -591,7 +603,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "ints basic aggregates group by ints", intCsvDf, // All the literals get turned into doubles, so we need to support avg in those cases - conf = makeBatchedBytes(3).set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { + conf = makeBatchedBytes(3, enableCsvConf()) + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy("ints").agg( lit(456f), min(col("ints")) + lit(123), @@ -608,7 +621,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { "ints basic aggregates group by more_ints", intCsvDf, // All the literals get turned into doubles, so we need to support avg in those cases - conf = makeBatchedBytes(3).set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { + conf = makeBatchedBytes(3, enableCsvConf()) + .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy("more_ints").agg( lit(456f), min(col("ints")) + lit(123), @@ -624,7 +638,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by doubles", doubleCsvDf, - conf = makeBatchedBytes(3)) { + conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("doubles").agg( lit(456f), min(col("doubles")) + lit(123), @@ -640,7 +654,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by more_doubles", doubleCsvDf, - conf = makeBatchedBytes(3)) { + conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("more_doubles").agg( lit(456f), min(col("doubles")) + lit(123), @@ -655,21 +669,25 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { IGNORE_ORDER_testSparkResultsAreEqual( "sum(longs) multi group by longs, more_longs", - longsCsvDf) { + longsCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy("longs", "more_longs").agg( sum("longs"), count("*")) } // misc aggregation tests - testSparkResultsAreEqual("sum(ints) group by literal", intCsvDf) { + testSparkResultsAreEqual("sum(ints) group by literal", intCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(lit(1)).agg(sum("ints")) } - IGNORE_ORDER_testSparkResultsAreEqual("sum(ints) group by dates", datesCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("sum(ints) group by dates", datesCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy("dates").sum("ints") } - IGNORE_ORDER_testSparkResultsAreEqual("max(ints) group by month", datesCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("max(ints) group by month", datesCsvDf, + conf = enableCsvConf()) { frame => frame.withColumn("monthval", month(col("dates"))) .groupBy(col("monthval")) .agg(max("ints").as("max_ints_by_month")) @@ -678,6 +696,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "sum(floats) group by more_floats 2 partitions", floatCsvDf, + conf = enableCsvConf(), repart = 2) { frame => frame.groupBy("more_floats").sum("floats") } @@ -685,6 +704,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "avg(floats) group by more_floats 4 partitions", floatCsvDf, + conf = enableCsvConf(), repart = 4) { frame => frame.groupBy("more_floats").avg("floats") } @@ -692,6 +712,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "avg(floats),count(floats) group by more_floats 4 partitions", floatCsvDf, + conf = enableCsvConf(), repart = 4) { frame => frame .groupBy("more_floats") @@ -722,23 +743,28 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { max(col("ints") + col("more_ints")), lit(1), min("ints")) } - IGNORE_ORDER_testSparkResultsAreEqual("grouping expressions", longsCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("grouping expressions", longsCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(col("more_longs") + lit(10)).agg(min("longs")) } - IGNORE_ORDER_testSparkResultsAreEqual("grouping expressions 2", longsCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("grouping expressions 2", longsCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(col("more_longs") + col("longs")).agg(min("longs")) } - IGNORE_ORDER_testSparkResultsAreEqual("first ignoreNulls=false", intCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("first ignoreNulls=false", intCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(col("more_ints")).agg(first("ints", false)) } - IGNORE_ORDER_testSparkResultsAreEqual("last ignoreNulls=false", intCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("last ignoreNulls=false", intCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(col("more_ints")).agg(last("ints", false)) } - IGNORE_ORDER_testSparkResultsAreEqual("first/last ints column", intCsvDf) { + IGNORE_ORDER_testSparkResultsAreEqual("first/last ints column", intCsvDf, + conf = enableCsvConf()) { frame => frame.groupBy(col("more_ints")).agg( first("ints", ignoreNulls = false), last("ints", ignoreNulls = false)) @@ -835,11 +861,13 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { maxStringLen = 2)) } - FLOAT_TEST_testSparkResultsAreEqual("empty df: reduction count", floatCsvDf) { + FLOAT_TEST_testSparkResultsAreEqual("empty df: reduction count", floatCsvDf, + conf = enableCsvConf()) { frame => frame.filter("floats > 10000000.0").agg(count("*")) } - FLOAT_TEST_testSparkResultsAreEqual("empty df: reduction aggs", floatCsvDf) { + FLOAT_TEST_testSparkResultsAreEqual("empty df: reduction aggs", floatCsvDf, + conf = enableCsvConf()) { frame => frame.filter("floats > 10000000.0").agg( lit(456f), min(col("floats")) + lit(123), @@ -856,19 +884,20 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { count("*")) } - FLOAT_TEST_testSparkResultsAreEqual("empty df: grouped count", floatCsvDf) { + FLOAT_TEST_testSparkResultsAreEqual("empty df: grouped count", floatCsvDf, + conf = enableCsvConf()) { frame => frame.filter("floats > 10000000.0").groupBy("floats").agg(count("*")) } FLOAT_TEST_testSparkResultsAreEqual("partial on gpu: empty df: grouped count", floatCsvDf, - conf = replaceHashAggMode("partial"), + conf = replaceHashAggMode("partial", enableCsvConf()), execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Count", "Literal")) { frame => frame.filter("floats > 10000000.0").groupBy("floats").agg(count("*")) } FLOAT_TEST_testSparkResultsAreEqual("final on gpu: empty df: grouped count", floatCsvDf, - conf = replaceHashAggMode("final"), + conf = replaceHashAggMode("final", enableCsvConf()), execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Count", "Literal", "KnownFloatingPointNormalized", "NormalizeNaNAndZero")) { frame => frame.filter("floats > 10000000.0").groupBy("floats").agg(count("*")) @@ -876,7 +905,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "empty df: float basic aggregates group by floats", - floatCsvDf) { + floatCsvDf, + conf = enableCsvConf()) { frame => frame.filter("floats > 10000000.0").groupBy("floats").agg( lit(456f), min(col("floats")) + lit(123), @@ -894,7 +924,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "partial on gpu: empty df: float basic aggregates group by floats", floatCsvDf, - conf = replaceHashAggMode("partial"), + conf = replaceHashAggMode("partial", enableCsvConf()), execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Literal", "Min", "Sum", "Max", "Average", "Add", "Multiply", "Subtract", "Cast", "First", "Last", "Count")) { @@ -915,7 +945,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "final on gpu: empty df: float basic aggregates group by floats", floatCsvDf, - conf = replaceHashAggMode("final"), + conf = replaceHashAggMode("final", enableCsvConf()), execsAllowedNonGpu = Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Literal", "Min", "Sum", "Max", "Average", "Add", "Multiply", "Subtract", "Cast", "First", "Last", "Count", "KnownFloatingPointNormalized", @@ -934,7 +964,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { count("*")) } - testSparkResultsAreEqual("Agg expression with filter", longsFromCSVDf) { + testSparkResultsAreEqual("Agg expression with filter", longsFromCSVDf, + conf = enableCsvConf()) { frame => frame.selectExpr("count(1) filter (where longs > 20)") } @@ -1073,9 +1104,9 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { max("more_longs")) } { (_, gpuPlan) => checkExecPlan(gpuPlan) } - private val partialOnlyConf = replaceHashAggMode("partial").set( + private val partialOnlyConf = replaceHashAggMode("partial", enableCsvConf()).set( RapidsConf.ENABLE_FLOAT_AGG.key, "true").set(RapidsConf.HAS_NANS.key, "false") - private val finalOnlyConf = replaceHashAggMode("final").set( + private val finalOnlyConf = replaceHashAggMode("final", enableCsvConf()).set( RapidsConf.ENABLE_FLOAT_AGG.key, "true").set(RapidsConf.HAS_NANS.key, "false") IGNORE_ORDER_ALLOW_NON_GPU_testSparkResultsAreEqualWithCapture( @@ -1502,7 +1533,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { res } - testSparkResultsAreEqual("Sum with filter", longsFromCSVDf) { + testSparkResultsAreEqual("Sum with filter", longsFromCSVDf, conf = enableCsvConf()) { frame => val res = frame.selectExpr("sum(longs) filter (where longs < 10)") res } @@ -1555,7 +1586,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { res } - testSparkResultsAreEqual("Max with filter", longsFromCSVDf) { + testSparkResultsAreEqual("Max with filter", longsFromCSVDf, conf = enableCsvConf()) { frame => val res = frame.selectExpr("max(longs) filter (where longs < 10)") res } @@ -1585,7 +1616,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { nanDf, Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Max"), - conf = new SparkConf()) { + conf = enableCsvConf()) { frame => frame.agg(max("doubles")) } { (_, gpuPlan) => { // verify nothing ran on the gpu @@ -1600,7 +1631,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { nanDf, Seq("HashAggregateExec", "AggregateExpression", "AttributeReference", "Alias", "Min"), - conf = new SparkConf()) { + conf = enableCsvConf()) { frame => frame.agg(min("doubles")) } { (_, gpuPlan) => { // verify nothing ran on the gpu @@ -1613,7 +1644,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { IGNORE_ORDER_testSparkResultsAreEqual( testName = "Test NormalizeNansAndZeros(Float)", floatWithDifferentKindsOfNansAndZeros, - conf = new SparkConf() + conf = enableCsvConf() .set(RapidsConf.HAS_NANS.key, "false") .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy(col("float")).agg(sum(col("int"))) @@ -1622,7 +1653,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { IGNORE_ORDER_testSparkResultsAreEqual( testName = "Test NormalizeNansAndZeros(Double)", doubleWithDifferentKindsOfNansAndZeros, - conf = new SparkConf() + conf = enableCsvConf() .set(RapidsConf.HAS_NANS.key, "false") .set(RapidsConf.ENABLE_FLOAT_AGG.key, "true")) { frame => frame.groupBy(col("double")).agg(sum(col("int"))) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/LimitExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/LimitExecSuite.scala index 8cc335fb3af..04157aa7fe0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/LimitExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/LimitExecSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ class LimitExecSuite extends SparkQueryCompareTestSuite { /** CollectLimitExec is off by default, turn it on for tests */ def enableCollectLimitExec(conf: SparkConf = new SparkConf()): SparkConf = { - conf.set("spark.rapids.sql.exec.CollectLimitExec", "true") + enableCsvConf().set("spark.rapids.sql.exec.CollectLimitExec", "true") } testSparkResultsAreEqual("limit more than rows", intCsvDf, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala index 62c77be49ed..2d0c3c90f8a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite { assert(d < 1.0) assert(d >= 0.0) }) - }) + }, conf = enableCsvConf()) } testSparkResultsAreEqual("Test literal values in select", mixedFloatDf) { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ScalarSubquerySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ScalarSubquerySuite.scala index fc2206d278e..39132c9955e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ScalarSubquerySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ScalarSubquerySuite.scala @@ -30,6 +30,7 @@ class ScalarSubquerySuite extends SparkQueryCompareTestSuite { } testSparkResultsAreEqual("Uncorrelated Scalar Subquery", longsFromCSVDf, + conf = enableCsvConf(), repart = 0) { frame => { frame.createOrReplaceTempView("table") diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SortExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SortExecSuite.scala index d897d3abf0f..b2c32564b20 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SortExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SortExecSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019, 2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,12 +35,12 @@ class SortExecSuite extends SparkQueryCompareTestSuite { } testSparkResultsAreEqual("GpuRangePartitioning with numparts > numvalues v1", longsCsvDf, - conf=makeBatchedBytes(1)) { + conf = makeBatchedBytes(1, enableCsvConf())) { df => df.filter(df.col("longs").gt(1)).sort(df.col("longs")) } testSparkResultsAreEqual("GpuRangePartitioning with numparts > numvalues v2", longsCsvDf, - conf=new SparkConf().set("spark.sql.shuffle.partitions", "4"), repart = 4) { + conf = enableCsvConf().set("spark.sql.shuffle.partitions", "4"), repart = 4) { df => df.filter(df.col("longs").lt(-800)).sort(df.col("longs")) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index 41c7e4dc4b0..3be5caac1c1 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -148,6 +148,17 @@ object SparkSessionHolder extends Logging { trait SparkQueryCompareTestSuite extends FunSuite with Arm { import SparkSessionHolder.withSparkSession + def enableCsvConf(): SparkConf = { + new SparkConf() + .set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true") + .set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true") + .set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true") + .set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true") + .set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true") + .set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true") + .set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true") + } + // @see java.lang.Float#intBitsToFloat // // If the argument is any value in the range `0x7f800001` through `0x7fffffff` or