Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop the in-range check at INT96 output path [databricks] #8824

Merged
merged 15 commits into from
Aug 4, 2023
Merged
82 changes: 37 additions & 45 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ not true for sorting. For all versions of the plugin `-0.0` == `0.0` for sorting
Spark's sorting is typically a [stable](https://en.wikipedia.org/wiki/Sorting_algorithm#Stability)
sort. Sort stability cannot be guaranteed in distributed work loads because the order in which
upstream data arrives to a task is not guaranteed. Sort stability is only
guaranteed in one situation which is reading and sorting data from a file using a single
guaranteed in one situation which is reading and sorting data from a file using a single
task/partition. The RAPIDS Accelerator does an unstable
[out of core](https://en.wikipedia.org/wiki/External_memory_algorithm) sort by default. This
simply means that the sort algorithm allows for spilling parts of the data if it is larger than
Expand All @@ -44,7 +44,7 @@ in the future to allow for a spillable stable sort.

For most basic floating-point operations like addition, subtraction, multiplication, and division
the plugin will produce a bit for bit identical result as Spark does. For other functions like
`sin`, `cos`, etc. the output may be different, but within the rounding error inherent in
`sin`, `cos`, etc. the output may be different, but within the rounding error inherent in
floating-point calculations. The ordering of operations to calculate the value may differ between the
underlying JVM implementation used by the CPU and the C++ standard library implementation used by
the GPU.
Expand All @@ -65,7 +65,7 @@ conditions within the computation itself the result may not be the same each tim
run. This is inherent in how the plugin speeds up the calculations and cannot be "fixed." If a query
joins on a floating point value, which is not wise to do anyways, and the value is the result of a
floating point aggregation then the join may fail to work properly with the plugin but would have
worked with plain Spark. Starting from 22.06 this is behavior is enabled by default but can be disabled with
worked with plain Spark. Starting from 22.06 this is behavior is enabled by default but can be disabled with
the config
[`spark.rapids.sql.variableFloatAgg.enabled`](configs.md#sql.variableFloatAgg.enabled).

Expand Down Expand Up @@ -201,7 +201,7 @@ Hive has some limitations in what decimal values it can parse. The GPU kernels t
to parse decimal values do not have the same limitations. This means that there are times
when the CPU version would return a null for an input value, but the GPU version will
return a value. This typically happens for numbers with large negative exponents where
the GPU will return `0` and Hive will return `null`.
the GPU will return `0` and Hive will return `null`.
See https://github.com/NVIDIA/spark-rapids/issues/7246

## ORC
Expand All @@ -221,15 +221,15 @@ The plugin supports reading `uncompressed`, `snappy`, `zlib` and `zstd` ORC file
### Push Down Aggregates for ORC

Spark-3.3.0+ pushes down certain aggregations (`MIN`/`MAX`/`COUNT`) into ORC when the user-config
`spark.sql.orc.aggregatePushdown` is set to true.
`spark.sql.orc.aggregatePushdown` is set to true.
By enabling this feature, aggregate query performance will improve as it takes advantage of the
statistics information.

**Caution**

Spark ORC reader/writer assumes that all ORC files must have valid column statistics. This assumption
deviates from the [ORC-specification](https://orc.apache.org/specification) which states that statistics
are optional.
are optional.
When a Spark-3.3.0+ job reads an ORC file with empty file-statistics, it fails while throwing the following
runtime exception:

Expand All @@ -244,7 +244,7 @@ E at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.createAggIn
```

The Spark community is planning to work on a runtime fallback to read from actual rows when ORC
file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.org/jira/browse/SPARK-34960)).
file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.org/jira/browse/SPARK-34960)).

**Limitations With RAPIDS**

Expand All @@ -253,15 +253,15 @@ RAPIDS does not support whole file statistics in ORC file in releases prior to r
*Writing ORC Files*

If you are using release prior to release 22.06 where CUDF does not support writing file statistics, then the ORC files
written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
In order to prevent job failures in releases prior to release 22.06, `spark.sql.orc.aggregatePushdown` should be disabled
while reading ORC files that were written by the GPU.

*Reading ORC Files*

To take advantage of the aggregate optimization, the plugin falls back to the CPU as it is a meta data only query.
As long as the ORC file has valid statistics (written by the CPU), then the pushing down aggregates to the ORC layer
should be successful.
should be successful.
Otherwise, reading an ORC file written by the GPU requires `aggregatePushdown` to be disabled.

## Parquet
Expand All @@ -277,23 +277,15 @@ values occurring before the transition between the Julian and Gregorian calendar
When writing `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` is currently ignored as described
[here](https://github.com/NVIDIA/spark-rapids/issues/144).

When `spark.sql.parquet.outputTimestampType` is set to `INT96`, the timestamps will overflow and
result in an `IllegalArgumentException` thrown, if any value is before
September 21, 1677 12:12:43 AM or it is after April 11, 2262 11:47:17 PM. To get around this
issue, turn off the ParquetWriter acceleration for timestamp columns by either setting
`spark.rapids.sql.format.parquet.writer.int96.enabled` to false or
set `spark.sql.parquet.outputTimestampType` to `TIMESTAMP_MICROS` or `TIMESTAMP_MILLIS` to by
-pass the issue entirely.

The plugin supports reading `uncompressed`, `snappy`, `gzip` and `zstd` Parquet files and writing
`uncompressed` and `snappy` Parquet files. At this point, the plugin does not have the ability to
fall back to the CPU when reading an unsupported compression format, and will error out in that
case.

## JSON

The JSON format read is a very experimental feature which is expected to have some issues, so we disable
it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.json.enabled` and
The JSON format read is a very experimental feature which is expected to have some issues, so we disable
it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.json.enabled` and
`spark.rapids.sql.format.json.read.enabled`.

Reading input containing invalid JSON format (in any row) will throw runtime exception.
Expand Down Expand Up @@ -347,43 +339,43 @@ scala> df.selectExpr("from_json(value, 'MAP<STRING,STRING>')").show()

Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float).

Prior to Spark 3.3.0, reading JSON strings such as `"+Infinity"` when specifying that the data type is `FloatType`
Prior to Spark 3.3.0, reading JSON strings such as `"+Infinity"` when specifying that the data type is `FloatType`
or `DoubleType` caused these values to be parsed even when `allowNonNumericNumbers` is set to false. Also, Spark
versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` representations of infinity and did not
support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is
versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` representations of infinity and did not
support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is
consistent with the behavior in Spark 3.3.0 and later.

Another limitation of the GPU JSON reader is that it will parse strings containing non-string boolean or numeric values where
Spark will treat them as invalid inputs and will just return `null`.

### JSON Timestamps

There is currently no support for reading numeric values as timestamps and null values are returned instead
([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast
There is currently no support for reading numeric values as timestamps and null values are returned instead
([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast
to timestamp.

### JSON Schema discovery

Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU
Spark SQL can automatically infer the schema of a JSON dataset if schema is not provided explicitly. The CPU
handles schema discovery and there is no GPU acceleration of this. By default Spark will read/parse the entire
dataset to determine the schema. This means that some options/errors which are ignored by the GPU may still
result in an exception if used with schema discovery.

### JSON options

Spark supports passing options to the JSON parser when reading a dataset. In most cases if the RAPIDS Accelerator
sees one of these options that it does not support it will fall back to the CPU. In some cases we do not. The
Spark supports passing options to the JSON parser when reading a dataset. In most cases if the RAPIDS Accelerator
sees one of these options that it does not support it will fall back to the CPU. In some cases we do not. The
following options are documented below.

- `allowNumericLeadingZeros` - Allows leading zeros in numbers (e.g. 00012). By default this is set to false.
When it is false Spark throws an exception if it encounters this type of number. The RAPIDS Accelerator
- `allowNumericLeadingZeros` - Allows leading zeros in numbers (e.g. 00012). By default this is set to false.
When it is false Spark throws an exception if it encounters this type of number. The RAPIDS Accelerator
strips off leading zeros from all numbers and this config has no impact on it.

- `allowUnquotedControlChars` - Allows JSON Strings to contain unquoted control characters (ASCII characters with
value less than 32, including tab and line feed characters) or not. By default this is set to false. If the schema
is provided while reading JSON file, then this flag has no impact on the RAPIDS Accelerator as it always allows
unquoted control characters but Spark reads these entries incorrectly as null. However, if the schema is not provided
and when the option is false, then RAPIDS Accelerator's behavior is same as Spark where an exception is thrown
- `allowUnquotedControlChars` - Allows JSON Strings to contain unquoted control characters (ASCII characters with
value less than 32, including tab and line feed characters) or not. By default this is set to false. If the schema
is provided while reading JSON file, then this flag has no impact on the RAPIDS Accelerator as it always allows
unquoted control characters but Spark reads these entries incorrectly as null. However, if the schema is not provided
and when the option is false, then RAPIDS Accelerator's behavior is same as Spark where an exception is thrown
as discussed in `JSON Schema discovery` section.

- `allowNonNumericNumbers` - Allows `NaN` and `Infinity` values to be parsed (note that these are not valid numeric
Expand All @@ -395,7 +387,7 @@ Spark version 3.3.0 and later.
## Avro

The Avro format read is a very experimental feature which is expected to have some issues, so we disable
it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.avro.enabled` and
it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.avro.enabled` and
`spark.rapids.sql.format.avro.read.enabled`.

Currently, the GPU accelerated Avro reader doesn't support reading the Avro version 1.2 files.
Expand Down Expand Up @@ -444,7 +436,7 @@ The following regular expression patterns are not yet supported on the GPU and w
- Line and string anchors are not supported by `string_split` and `str_to_map`
- Lazy quantifiers, such as `a*?`
- Possessive quantifiers, such as `a*+`
- Character classes that use union, intersection, or subtraction semantics, such as `[a-d[m-p]]`, `[a-z&&[def]]`,
- Character classes that use union, intersection, or subtraction semantics, such as `[a-d[m-p]]`, `[a-z&&[def]]`,
or `[a-z&&[^bc]]`
- Empty groups: `()`

Expand Down Expand Up @@ -561,10 +553,10 @@ on the GPU without requiring any additional settings.
- `MM-dd-yyyy`
- `MMyyyy`

Valid Spark date/time formats that do not appear in the list above may also be supported but have not been
Valid Spark date/time formats that do not appear in the list above may also be supported but have not been
extensively tested and may produce different results compared to the CPU. Known issues include:

- Valid dates and timestamps followed by trailing characters (including whitespace) may be parsed to non-null
- Valid dates and timestamps followed by trailing characters (including whitespace) may be parsed to non-null
values on GPU where Spark would treat the data as invalid and return null

To attempt to use other formats on the GPU, set
Expand All @@ -591,7 +583,7 @@ Formats that contain any of the following words are unsupported and will fall ba

With timeParserPolicy set to `LEGACY` and
[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled)
set to `true`, and `spark.sql.ansi.enabled` set to `false`, the following formats are supported but not
set to `true`, and `spark.sql.ansi.enabled` set to `false`, the following formats are supported but not
guaranteed to produce the same results as the CPU:

- `dd-MM-yyyy`
Expand All @@ -604,7 +596,7 @@ guaranteed to produce the same results as the CPU:
LEGACY timeParserPolicy support has the following limitations when running on the GPU:

- Only 4 digit years are supported
- The proleptic Gregorian calendar is used instead of the hybrid Julian+Gregorian calendar
- The proleptic Gregorian calendar is used instead of the hybrid Julian+Gregorian calendar
that Spark uses in legacy mode

## Formatting dates and timestamps as strings
Expand Down Expand Up @@ -645,7 +637,7 @@ leads to restrictions:
* Float values cannot be larger than `1e18` or smaller than `-1e18` after conversion.
* The results produced by GPU slightly differ from the default results of Spark.

Starting from 22.06 this conf is enabled, to disable this operation on the GPU when using Spark 3.1.0 or
Starting from 22.06 this conf is enabled, to disable this operation on the GPU when using Spark 3.1.0 or
later, set
[`spark.rapids.sql.castFloatToDecimal.enabled`](configs.md#sql.castFloatToDecimal.enabled) to `false`

Expand Down Expand Up @@ -679,7 +671,7 @@ represents any number in the following ranges. In both cases the GPU returns `Do
default behavior in Apache Spark is to return `+Infinity` and `-Infinity`, respectively.

- `1.7976931348623158E308 <= x < 1.7976931348623159E308`
- `-1.7976931348623159E308 < x <= -1.7976931348623158E308`
- `-1.7976931348623159E308 < x <= -1.7976931348623158E308`

Also, the GPU does not support casting from strings containing hex values.

Expand Down Expand Up @@ -707,10 +699,10 @@ The following formats/patterns are supported on the GPU. Timezone of UTC is assu

### String to Timestamp

To allow casts from string to timestamp on the GPU, enable the configuration property
To allow casts from string to timestamp on the GPU, enable the configuration property
[`spark.rapids.sql.castStringToTimestamp.enabled`](configs.md#sql.castStringToTimestamp.enabled).

Casting from string to timestamp currently has the following limitations.
Casting from string to timestamp currently has the following limitations.

| Format or Pattern | Supported on GPU? |
| ------------------------------------------------------------------- | ------------------|
Expand Down Expand Up @@ -743,7 +735,7 @@ values.

ConstantFolding is an operator optimization rule in Catalyst that replaces expressions that can
be statically evaluated with their equivalent literal values. The RAPIDS Accelerator relies
on constant folding and parts of the query will not be accelerated if
on constant folding and parts of the query will not be accelerated if
`org.apache.spark.sql.catalyst.optimizer.ConstantFolding` is excluded as a rule.

### long/double to Timestamp
Expand Down
21 changes: 11 additions & 10 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen):
data_path,
conf=writer_confs)

# we are limiting TimestampGen to avoid overflowing the INT96 value
# see https://github.com/rapidsai/cudf/issues/8070
@pytest.mark.parametrize('data_gen', [TimestampGen(end=datetime(1677, 9, 22, tzinfo=timezone.utc)),
TimestampGen(start=datetime(2262, 4, 11, tzinfo=timezone.utc))], ids=idfn)
def test_catch_int96_overflow(spark_tmp_path, data_gen):
def test_fix_for_int96_overflow(spark_tmp_path, data_gen):
jlowe marked this conversation as resolved.
Show resolved Hide resolved
data_path = spark_tmp_path + '/PARQUET_DATA'
confs = copy_and_update(writer_confs, {'spark.sql.parquet.outputTimestampType': 'INT96'})
assert_spark_exception(lambda: with_gpu_session(
lambda spark: unary_op_df(spark, data_gen).coalesce(1).write.parquet(data_path), conf=confs), "org.apache.spark.SparkException: Job aborted.")
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: unary_op_df(spark, data_gen).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf = confs)


@pytest.mark.skipif(is_spark_340_or_later() or is_databricks122_or_later(), reason="`WriteFilesExec` is only supported in Spark 340+")
Expand Down Expand Up @@ -293,9 +294,9 @@ def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_fact
assert e_info.match(r".*SparkUpgradeException.*")

# TODO - we are limiting the INT96 values, see https://github.com/rapidsai/cudf/issues/8070
@pytest.mark.parametrize('ts_write_data_gen',
[('INT96', TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))),
('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))),
@pytest.mark.parametrize('ts_write_data_gen',
[('INT96', TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))),
('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))),
('TIMESTAMP_MILLIS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc)))])
@pytest.mark.parametrize('rebase', ["CORRECTED","EXCEPTION"])
def test_ts_write_fails_datetime_exception(spark_tmp_path, ts_write_data_gen, spark_tmp_table_factory, rebase):
Expand Down Expand Up @@ -489,8 +490,8 @@ def generate_map_with_empty_validity(spark, path):
lambda spark, path: spark.read.parquet(path),
data_path)

@pytest.mark.parametrize('ts_write_data_gen', [('INT96', limited_int96()),
('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc))),
@pytest.mark.parametrize('ts_write_data_gen', [('INT96', limited_int96()),
('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc))),
('TIMESTAMP_MILLIS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1582, 1, 1, tzinfo=timezone.utc)))])
@pytest.mark.parametrize('date_time_rebase_write', ["CORRECTED"])
@pytest.mark.parametrize('date_time_rebase_read', ["EXCEPTION", "CORRECTED"])
Expand Down
Loading