From 3e7277442e1f50623ad53067c0272e2874c3e90e Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 4 Aug 2023 15:20:43 -0700 Subject: [PATCH] Drop the in-range check at INT96 output path [databricks] (#8824) - Remove INT96 check for being in the Long.Min:Max range. - Update docs and tests Fixes #8625 Depends on rapidsai/cudf#8070, rapidsai/cudf#13776 Verified with the [notebook](https://github.com/gerashegalov/rapids-shell/blob/9cb4598b0feba7b71eb91f396f4b577bbb0dec00/src/jupyter/int96.ipynb) and ```bash JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 TEST_PARALLEL=0 SPARK_HOME=~/dist/spark-3.3.2-bin-hadoop3 ./integration_tests/run_pyspark_from_build.sh -k test_fix_for_int96_overflow ``` Signed-off-by: Gera Shegalov --- docs/compatibility.md | 82 +++++++++---------- .../src/main/python/delta_lake_write_test.py | 4 +- .../src/main/python/parquet_write_test.py | 57 ++++--------- .../spark/rapids/GpuParquetFileFormat.scala | 30 +------ 4 files changed, 55 insertions(+), 118 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 0d2e11633de..b5cb01757dd 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -30,7 +30,7 @@ not true for sorting. For all versions of the plugin `-0.0` == `0.0` for sorting Spark's sorting is typically a [stable](https://en.wikipedia.org/wiki/Sorting_algorithm#Stability) sort. Sort stability cannot be guaranteed in distributed work loads because the order in which upstream data arrives to a task is not guaranteed. Sort stability is only -guaranteed in one situation which is reading and sorting data from a file using a single +guaranteed in one situation which is reading and sorting data from a file using a single task/partition. The RAPIDS Accelerator does an unstable [out of core](https://en.wikipedia.org/wiki/External_memory_algorithm) sort by default. This simply means that the sort algorithm allows for spilling parts of the data if it is larger than @@ -44,7 +44,7 @@ in the future to allow for a spillable stable sort. For most basic floating-point operations like addition, subtraction, multiplication, and division the plugin will produce a bit for bit identical result as Spark does. For other functions like -`sin`, `cos`, etc. the output may be different, but within the rounding error inherent in +`sin`, `cos`, etc. the output may be different, but within the rounding error inherent in floating-point calculations. The ordering of operations to calculate the value may differ between the underlying JVM implementation used by the CPU and the C++ standard library implementation used by the GPU. @@ -65,7 +65,7 @@ conditions within the computation itself the result may not be the same each tim run. This is inherent in how the plugin speeds up the calculations and cannot be "fixed." If a query joins on a floating point value, which is not wise to do anyways, and the value is the result of a floating point aggregation then the join may fail to work properly with the plugin but would have -worked with plain Spark. Starting from 22.06 this is behavior is enabled by default but can be disabled with +worked with plain Spark. Starting from 22.06 this is behavior is enabled by default but can be disabled with the config [`spark.rapids.sql.variableFloatAgg.enabled`](configs.md#sql.variableFloatAgg.enabled). @@ -201,7 +201,7 @@ Hive has some limitations in what decimal values it can parse. The GPU kernels t to parse decimal values do not have the same limitations. This means that there are times when the CPU version would return a null for an input value, but the GPU version will return a value. This typically happens for numbers with large negative exponents where -the GPU will return `0` and Hive will return `null`. +the GPU will return `0` and Hive will return `null`. See https://github.com/NVIDIA/spark-rapids/issues/7246 ## ORC @@ -221,7 +221,7 @@ The plugin supports reading `uncompressed`, `snappy`, `zlib` and `zstd` ORC file ### Push Down Aggregates for ORC Spark-3.3.0+ pushes down certain aggregations (`MIN`/`MAX`/`COUNT`) into ORC when the user-config -`spark.sql.orc.aggregatePushdown` is set to true. +`spark.sql.orc.aggregatePushdown` is set to true. By enabling this feature, aggregate query performance will improve as it takes advantage of the statistics information. @@ -229,7 +229,7 @@ statistics information. Spark ORC reader/writer assumes that all ORC files must have valid column statistics. This assumption deviates from the [ORC-specification](https://orc.apache.org/specification) which states that statistics -are optional. +are optional. When a Spark-3.3.0+ job reads an ORC file with empty file-statistics, it fails while throwing the following runtime exception: @@ -244,7 +244,7 @@ E at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.createAggIn ``` The Spark community is planning to work on a runtime fallback to read from actual rows when ORC -file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.org/jira/browse/SPARK-34960)). +file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.org/jira/browse/SPARK-34960)). **Limitations With RAPIDS** @@ -253,7 +253,7 @@ RAPIDS does not support whole file statistics in ORC file in releases prior to r *Writing ORC Files* If you are using release prior to release 22.06 where CUDF does not support writing file statistics, then the ORC files -written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above. +written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above. In order to prevent job failures in releases prior to release 22.06, `spark.sql.orc.aggregatePushdown` should be disabled while reading ORC files that were written by the GPU. @@ -261,7 +261,7 @@ while reading ORC files that were written by the GPU. To take advantage of the aggregate optimization, the plugin falls back to the CPU as it is a meta data only query. As long as the ORC file has valid statistics (written by the CPU), then the pushing down aggregates to the ORC layer -should be successful. +should be successful. Otherwise, reading an ORC file written by the GPU requires `aggregatePushdown` to be disabled. ## Parquet @@ -277,14 +277,6 @@ values occurring before the transition between the Julian and Gregorian calendar When writing `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` is currently ignored as described [here](https://github.com/NVIDIA/spark-rapids/issues/144). -When `spark.sql.parquet.outputTimestampType` is set to `INT96`, the timestamps will overflow and -result in an `IllegalArgumentException` thrown, if any value is before -September 21, 1677 12:12:43 AM or it is after April 11, 2262 11:47:17 PM. To get around this -issue, turn off the ParquetWriter acceleration for timestamp columns by either setting -`spark.rapids.sql.format.parquet.writer.int96.enabled` to false or -set `spark.sql.parquet.outputTimestampType` to `TIMESTAMP_MICROS` or `TIMESTAMP_MILLIS` to by --pass the issue entirely. - The plugin supports reading `uncompressed`, `snappy`, `gzip` and `zstd` Parquet files and writing `uncompressed` and `snappy` Parquet files. At this point, the plugin does not have the ability to fall back to the CPU when reading an unsupported compression format, and will error out in that @@ -292,8 +284,8 @@ case. ## JSON -The JSON format read is a very experimental feature which is expected to have some issues, so we disable -it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.json.enabled` and +The JSON format read is a very experimental feature which is expected to have some issues, so we disable +it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.json.enabled` and `spark.rapids.sql.format.json.read.enabled`. Reading input containing invalid JSON format (in any row) will throw runtime exception. @@ -347,10 +339,10 @@ scala> df.selectExpr("from_json(value, 'MAP')").show() Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float). -Prior to Spark 3.3.0, reading JSON strings such as `"+Infinity"` when specifying that the data type is `FloatType` +Prior to Spark 3.3.0, reading JSON strings such as `"+Infinity"` when specifying that the data type is `FloatType` or `DoubleType` caused these values to be parsed even when `allowNonNumericNumbers` is set to false. Also, Spark -versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` representations of infinity and did not -support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is +versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` representations of infinity and did not +support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is consistent with the behavior in Spark 3.3.0 and later. Another limitation of the GPU JSON reader is that it will parse strings containing non-string boolean or numeric values where @@ -358,32 +350,32 @@ Spark will treat them as invalid inputs and will just return `null`. ### JSON Timestamps -There is currently no support for reading numeric values as timestamps and null values are returned instead -([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast +There is currently no support for reading numeric values as timestamps and null values are returned instead +([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast to timestamp. ### JSON Schema discovery -Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU +Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU handles schema discovery and there is no GPU acceleration of this. By default Spark will read/parse the entire dataset to determine the schema. This means that some options/errors which are ignored by the GPU may still result in an exception if used with schema discovery. ### JSON options -Spark supports passing options to the JSON parser when reading a dataset. In most cases if the RAPIDS Accelerator -sees one of these options that it does not support it will fall back to the CPU. In some cases we do not. The +Spark supports passing options to the JSON parser when reading a dataset. In most cases if the RAPIDS Accelerator +sees one of these options that it does not support it will fall back to the CPU. In some cases we do not. The following options are documented below. -- `allowNumericLeadingZeros` - Allows leading zeros in numbers (e.g. 00012). By default this is set to false. -When it is false Spark throws an exception if it encounters this type of number. The RAPIDS Accelerator +- `allowNumericLeadingZeros` - Allows leading zeros in numbers (e.g. 00012). By default this is set to false. +When it is false Spark throws an exception if it encounters this type of number. The RAPIDS Accelerator strips off leading zeros from all numbers and this config has no impact on it. -- `allowUnquotedControlChars` - Allows JSON Strings to contain unquoted control characters (ASCII characters with -value less than 32, including tab and line feed characters) or not. By default this is set to false. If the schema -is provided while reading JSON file, then this flag has no impact on the RAPIDS Accelerator as it always allows -unquoted control characters but Spark reads these entries incorrectly as null. However, if the schema is not provided -and when the option is false, then RAPIDS Accelerator's behavior is same as Spark where an exception is thrown +- `allowUnquotedControlChars` - Allows JSON Strings to contain unquoted control characters (ASCII characters with +value less than 32, including tab and line feed characters) or not. By default this is set to false. If the schema +is provided while reading JSON file, then this flag has no impact on the RAPIDS Accelerator as it always allows +unquoted control characters but Spark reads these entries incorrectly as null. However, if the schema is not provided +and when the option is false, then RAPIDS Accelerator's behavior is same as Spark where an exception is thrown as discussed in `JSON Schema discovery` section. - `allowNonNumericNumbers` - Allows `NaN` and `Infinity` values to be parsed (note that these are not valid numeric @@ -395,7 +387,7 @@ Spark version 3.3.0 and later. ## Avro The Avro format read is a very experimental feature which is expected to have some issues, so we disable -it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.avro.enabled` and +it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.avro.enabled` and `spark.rapids.sql.format.avro.read.enabled`. Currently, the GPU accelerated Avro reader doesn't support reading the Avro version 1.2 files. @@ -444,7 +436,7 @@ The following regular expression patterns are not yet supported on the GPU and w - Line and string anchors are not supported by `string_split` and `str_to_map` - Lazy quantifiers, such as `a*?` - Possessive quantifiers, such as `a*+` -- Character classes that use union, intersection, or subtraction semantics, such as `[a-d[m-p]]`, `[a-z&&[def]]`, +- Character classes that use union, intersection, or subtraction semantics, such as `[a-d[m-p]]`, `[a-z&&[def]]`, or `[a-z&&[^bc]]` - Empty groups: `()` @@ -561,10 +553,10 @@ on the GPU without requiring any additional settings. - `MM-dd-yyyy` - `MMyyyy` -Valid Spark date/time formats that do not appear in the list above may also be supported but have not been +Valid Spark date/time formats that do not appear in the list above may also be supported but have not been extensively tested and may produce different results compared to the CPU. Known issues include: -- Valid dates and timestamps followed by trailing characters (including whitespace) may be parsed to non-null +- Valid dates and timestamps followed by trailing characters (including whitespace) may be parsed to non-null values on GPU where Spark would treat the data as invalid and return null To attempt to use other formats on the GPU, set @@ -591,7 +583,7 @@ Formats that contain any of the following words are unsupported and will fall ba With timeParserPolicy set to `LEGACY` and [`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled) -set to `true`, and `spark.sql.ansi.enabled` set to `false`, the following formats are supported but not +set to `true`, and `spark.sql.ansi.enabled` set to `false`, the following formats are supported but not guaranteed to produce the same results as the CPU: - `dd-MM-yyyy` @@ -604,7 +596,7 @@ guaranteed to produce the same results as the CPU: LEGACY timeParserPolicy support has the following limitations when running on the GPU: - Only 4 digit years are supported -- The proleptic Gregorian calendar is used instead of the hybrid Julian+Gregorian calendar +- The proleptic Gregorian calendar is used instead of the hybrid Julian+Gregorian calendar that Spark uses in legacy mode ## Formatting dates and timestamps as strings @@ -645,7 +637,7 @@ leads to restrictions: * Float values cannot be larger than `1e18` or smaller than `-1e18` after conversion. * The results produced by GPU slightly differ from the default results of Spark. -Starting from 22.06 this conf is enabled, to disable this operation on the GPU when using Spark 3.1.0 or +Starting from 22.06 this conf is enabled, to disable this operation on the GPU when using Spark 3.1.0 or later, set [`spark.rapids.sql.castFloatToDecimal.enabled`](configs.md#sql.castFloatToDecimal.enabled) to `false` @@ -679,7 +671,7 @@ represents any number in the following ranges. In both cases the GPU returns `Do default behavior in Apache Spark is to return `+Infinity` and `-Infinity`, respectively. - `1.7976931348623158E308 <= x < 1.7976931348623159E308` -- `-1.7976931348623159E308 < x <= -1.7976931348623158E308` +- `-1.7976931348623159E308 < x <= -1.7976931348623158E308` Also, the GPU does not support casting from strings containing hex values. @@ -707,10 +699,10 @@ The following formats/patterns are supported on the GPU. Timezone of UTC is assu ### String to Timestamp -To allow casts from string to timestamp on the GPU, enable the configuration property +To allow casts from string to timestamp on the GPU, enable the configuration property [`spark.rapids.sql.castStringToTimestamp.enabled`](configs.md#sql.castStringToTimestamp.enabled). -Casting from string to timestamp currently has the following limitations. +Casting from string to timestamp currently has the following limitations. | Format or Pattern | Supported on GPU? | | ------------------------------------------------------------------- | ------------------| @@ -743,7 +735,7 @@ values. ConstantFolding is an operator optimization rule in Catalyst that replaces expressions that can be statically evaluated with their equivalent literal values. The RAPIDS Accelerator relies -on constant folding and parts of the query will not be accelerated if +on constant folding and parts of the query will not be accelerated if `org.apache.spark.sql.catalyst.optimizer.ConstantFolding` is excluded as a rule. ### long/double to Timestamp diff --git a/integration_tests/src/main/python/delta_lake_write_test.py b/integration_tests/src/main/python/delta_lake_write_test.py index 222f8f26426..2dceb0fe275 100644 --- a/integration_tests/src/main/python/delta_lake_write_test.py +++ b/integration_tests/src/main/python/delta_lake_write_test.py @@ -23,7 +23,7 @@ from data_gen import * from conftest import is_databricks_runtime from marks import * -from parquet_write_test import limited_timestamp, parquet_part_write_gens, parquet_write_gens_list, writer_confs +from parquet_write_test import parquet_part_write_gens, parquet_write_gens_list, writer_confs from pyspark.sql.types import * from spark_session import is_before_spark_320, is_before_spark_330, is_databricks122_or_later, with_cpu_session @@ -769,7 +769,7 @@ def test_delta_write_optimized_supported_types(spark_tmp_path): "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true" }) simple_gens = [ byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, date_gen, limited_timestamp() ] + string_gen, boolean_gen, date_gen, TimestampGen() ] genlist = simple_gens + \ [ StructGen([("child" + str(i), gen) for i, gen in enumerate(simple_gens)]) ] + \ [ StructGen([("x", StructGen([("y", int_gen)]))]) ] diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index e9d31ec3158..3518b7d9517 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -40,24 +40,12 @@ writer_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'} - -def limited_timestamp(nullable=True): - return TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(2262, 4, 11, tzinfo=timezone.utc), - nullable=nullable) - -# TODO - we are limiting the INT96 values, see https://github.com/rapidsai/cudf/issues/8070 -def limited_int96(): - return TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(2262, 4, 11, tzinfo=timezone.utc)) - parquet_basic_gen =[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, date_gen, - # we are limiting TimestampGen to avoid overflowing the INT96 value - # see https://github.com/rapidsai/cudf/issues/8070 - limited_timestamp(), binary_gen] + string_gen, boolean_gen, date_gen, TimestampGen(), binary_gen] parquet_basic_map_gens = [MapGen(f(nullable=False), f()) for f in [ BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, - limited_timestamp]] + [simple_string_to_string_map_gen, + TimestampGen]] + [simple_string_to_string_map_gen, MapGen(DecimalGen(20, 2, nullable=False), decimal_gen_128bit), # python is not happy with binary values being keys of a map MapGen(StringGen("a{1,5}", nullable=False), binary_gen)] @@ -110,13 +98,13 @@ def test_write_round_trip(spark_tmp_path, parquet_gens): all_nulls_map_gen = SetValuesGen(MapType(StringType(), StringType()), [None]) all_empty_map_gen = SetValuesGen(MapType(StringType(), StringType()), [{}]) -par_write_odd_empty_strings_gens_sample = [all_nulls_string_gen, - empty_or_null_string_gen, +par_write_odd_empty_strings_gens_sample = [all_nulls_string_gen, + empty_or_null_string_gen, all_empty_string_gen, all_nulls_array_gen, all_empty_array_gen, all_array_empty_string_gen, - mixed_empty_nulls_array_gen, + mixed_empty_nulls_array_gen, mixed_empty_nulls_map_gen, all_nulls_map_gen, all_empty_map_gen] @@ -131,9 +119,9 @@ def test_write_round_trip_corner(spark_tmp_path, par_gen): data_path) @pytest.mark.parametrize('parquet_gens', [[ - limited_timestamp(), - ArrayGen(limited_timestamp(), max_length=10), - MapGen(limited_timestamp(nullable=False), limited_timestamp())]], ids=idfn) + TimestampGen(), + ArrayGen(TimestampGen(), max_length=10), + MapGen(TimestampGen(nullable=False), TimestampGen())]], ids=idfn) @pytest.mark.parametrize('ts_type', parquet_ts_write_options) def test_timestamp_write_round_trip(spark_tmp_path, parquet_gens, ts_type): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] @@ -149,9 +137,7 @@ def test_timestamp_write_round_trip(spark_tmp_path, parquet_gens, ts_type): @pytest.mark.parametrize('ts_rebase', ['CORRECTED']) @ignore_order def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): - # we are limiting TimestampGen to avoid overflowing the INT96 value - # see https://github.com/rapidsai/cudf/issues/8070 - gen = TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(2262, 4, 11, tzinfo=timezone.utc)) + gen = TimestampGen() data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path: unary_op_df(spark, gen).write.parquet(path), @@ -167,9 +153,7 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): # Some file systems have issues with UTF8 strings so to help the test pass even there StringGen('(\\w| ){0,50}'), boolean_gen, date_gen, - # we are limiting TimestampGen to avoid overflowing the INT96 value - # see https://github.com/rapidsai/cudf/issues/8070 - TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(2262, 4, 11, tzinfo=timezone.utc))] + TimestampGen()] # There are race conditions around when individual files are read in for partitioned data @ignore_order @@ -185,16 +169,6 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen): data_path, conf=writer_confs) -# we are limiting TimestampGen to avoid overflowing the INT96 value -# see https://github.com/rapidsai/cudf/issues/8070 -@pytest.mark.parametrize('data_gen', [TimestampGen(end=datetime(1677, 9, 22, tzinfo=timezone.utc)), - TimestampGen(start=datetime(2262, 4, 11, tzinfo=timezone.utc))], ids=idfn) -def test_catch_int96_overflow(spark_tmp_path, data_gen): - data_path = spark_tmp_path + '/PARQUET_DATA' - confs = copy_and_update(writer_confs, {'spark.sql.parquet.outputTimestampType': 'INT96'}) - assert_spark_exception(lambda: with_gpu_session( - lambda spark: unary_op_df(spark, data_gen).coalesce(1).write.parquet(data_path), conf=confs), "org.apache.spark.SparkException: Job aborted.") - @pytest.mark.skipif(is_spark_340_or_later() or is_databricks122_or_later(), reason="`WriteFilesExec` is only supported in Spark 340+") @pytest.mark.parametrize('data_gen', [TimestampGen()], ids=idfn) @@ -292,10 +266,9 @@ def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_fact df.coalesce(1).write.format("parquet").mode('overwrite').option("path", data_path).saveAsTable(spark_tmp_table_factory.get()) assert e_info.match(r".*SparkUpgradeException.*") -# TODO - we are limiting the INT96 values, see https://github.com/rapidsai/cudf/issues/8070 -@pytest.mark.parametrize('ts_write_data_gen', - [('INT96', TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))), - ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))), +@pytest.mark.parametrize('ts_write_data_gen', + [('INT96', TimestampGen()), + ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))), ('TIMESTAMP_MILLIS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc)))]) @pytest.mark.parametrize('rebase', ["CORRECTED","EXCEPTION"]) def test_ts_write_fails_datetime_exception(spark_tmp_path, ts_write_data_gen, spark_tmp_table_factory, rebase): @@ -489,8 +462,8 @@ def generate_map_with_empty_validity(spark, path): lambda spark, path: spark.read.parquet(path), data_path) -@pytest.mark.parametrize('ts_write_data_gen', [('INT96', limited_int96()), - ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc))), +@pytest.mark.parametrize('ts_write_data_gen', [('INT96', TimestampGen()), + ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc))), ('TIMESTAMP_MILLIS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc)))]) @pytest.mark.parametrize('date_time_rebase_write', ["CORRECTED"]) @pytest.mark.parametrize('date_time_rebase_read', ["EXCEPTION", "CORRECTED"]) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 71d47e99196..eb2ac1a78a3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -20,7 +20,7 @@ import ai.rapids.cudf._ import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray -import com.nvidia.spark.rapids.shims.{ParquetFieldIdShims, ParquetTimestampNTZShims, SparkShimImpl} +import com.nvidia.spark.rapids.shims._ import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext} import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel @@ -341,38 +341,10 @@ class GpuParquetWriter( // included in Spark's `TimestampType`. case (cv, _) if cv.getType.isTimestampType && cv.getType != DType.TIMESTAMP_DAYS => val typeMillis = ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString - val typeInt96 = ParquetOutputTimestampType.INT96.toString - outputTimestampType match { case `typeMillis` if cv.getType != DType.TIMESTAMP_MILLISECONDS => cv.castTo(DType.TIMESTAMP_MILLISECONDS) - case `typeInt96` => - val inRange = withResource(Scalar.fromLong(Long.MaxValue / 1000)) { upper => - withResource(Scalar.fromLong(Long.MinValue / 1000)) { lower => - withResource(cv.bitCastTo(DType.INT64)) { int64 => - withResource(int64.greaterOrEqualTo(upper)) { a => - withResource(int64.lessOrEqualTo(lower)) { b => - a.or(b) - } - } - } - } - } - val anyInRange = withResource(inRange)(_.any()) - withResource(anyInRange) { _ => - require(!(anyInRange.isValid && anyInRange.getBoolean), - // Its the writer's responsibility to close the input batch when this - // exception is thrown. - "INT96 column contains one " + - "or more values that can overflow and will result in data " + - "corruption. Please set " + - "`spark.rapids.sql.format.parquet.writer.int96.enabled` to false " + - "so we can fallback on CPU for writing parquet but still take " + - "advantage of parquet read on the GPU.") - } - cv.copyToColumnVector() /* the input is unchanged */ - // Here the value of `outputTimestampType` should be `TIMESTAMP_MICROS` case _ => cv.copyToColumnVector() /* the input is unchanged */ }