@@ -331,7 +332,8 @@
+ compress="${dist.jar.compress}"
+ destfile="${dist.jar.name}"/>
@@ -450,6 +452,65 @@
+
+ org.apache.maven.plugins
+ maven-install-plugin
+ 3.0.1
+
+
+ default-install
+ none
+
+
+ install-parallel-worlds-jar
+ install
+
+ install-file
+
+
+ ${dist.jar.name}
+ ${project.artifactId}
+ ${cuda.version}
+ ${project.groupId}
+ ${project.version}
+ jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 3.0.0
+
+
+ default-deploy
+ none
+
+
+ deploy-parallel-worlds-jar
+ deploy
+
+ deploy-file
+
+
+ ${dist.jar.name}
+ file://${java.io.tmpdir}/m2-repo
+ ${project.artifactId}
+ ${cuda.version}
+ ${project.groupId}
+ jar
+
+ ${project.version}
+
+
+
+
diff --git a/dist/unshimmed-spark311.txt b/dist/unshimmed-spark311.txt
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/docs/compatibility.md b/docs/compatibility.md
index c63bdd47e02..10cfcfd0783 100644
--- a/docs/compatibility.md
+++ b/docs/compatibility.md
@@ -882,6 +882,8 @@ Casting from string to timestamp currently has the following limitations.
| `"yyyy-[M]M "` | Yes |
| `"yyyy-[M]M-[d]d"` | Yes |
| `"yyyy-[M]M-[d]d "` | Yes |
+| `"yyyy-[M]M-[d]dT[h]h:[m]m:[s]s[zone_id]"` | Partial [\[1\]](#Footnote1) |
+| `"yyyy-[M]M-[d]d [h]h:[m]m:[s]s[zone_id]"` | Partial [\[1\]](#Footnote1) |
| `"yyyy-[M]M-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]"` | Partial [\[1\]](#Footnote1) |
| `"yyyy-[M]M-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]"` | Partial [\[1\]](#Footnote1) |
| `"[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]"` | Partial [\[1\]](#Footnote1) |
@@ -892,8 +894,8 @@ Casting from string to timestamp currently has the following limitations.
| `"tomorrow"` | Yes |
| `"yesterday"` | Yes |
-- [1] The timestamp portion must have 6 digits for milliseconds.
- Only timezone 'Z' (UTC) is supported. Casting unsupported formats will result in null values.
+- [1] Leap seconds are not supported. If a zone_id is provided then only
+ timezone 'Z' (UTC) is supported. Casting unsupported formats will result in null values.
Spark is very lenient when casting from string to timestamp because all date and time components
are optional, meaning that input values such as `T`, `T2`, `:`, `::`, `1:`, `:1`, and `::1`
diff --git a/docs/configs.md b/docs/configs.md
index b4598b315a8..edcf1bcc621 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -225,6 +225,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array|true|None|
spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None|
spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None|
+spark.rapids.sql.expression.FromUTCTimestamp|`from_utc_timestamp`|Render the input UTC timestamp in the input timezone|true|None|
spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
spark.rapids.sql.expression.GetArrayItem| |Gets the field at `ordinal` in the Array|true|None|
spark.rapids.sql.expression.GetArrayStructFields| |Extracts the `ordinal`-th fields of all array elements for the data with the type of array of struct|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 811cbfa80a6..cdd8f6f1105 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -5442,7 +5442,7 @@ are limited.
|
|
|
-PS Because of Spark's inner workings the full range of decimal precision (even for 128-bit values) is not supported. |
+S |
|
|
|
@@ -6114,6 +6114,74 @@ are limited.
|
+FromUTCTimestamp |
+`from_utc_timestamp` |
+Render the input UTC timestamp in the input timezone |
+None |
+project |
+timestamp |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+PS UTC is only supported TZ for TIMESTAMP |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
+timezone |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+PS Only timezones equivalent to UTC are supported |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
+result |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+PS UTC is only supported TZ for TIMESTAMP |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
FromUnixTime |
`from_unixtime` |
Get the string from a unix timestamp |
@@ -6250,6 +6318,32 @@ are limited.
NS |
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
GetArrayStructFields |
|
Extracts the `ordinal`-th fields of all array elements for the data with the type of array of struct |
@@ -6365,32 +6459,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
GetMapValue |
|
Gets Value from a Map based on a key |
@@ -6706,6 +6774,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
GreaterThanOrEqual |
`>=` |
>= operator |
@@ -6838,32 +6932,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
Greatest |
`greatest` |
Returns the greatest value of all parameters, skipping null values |
@@ -7115,6 +7183,32 @@ are limited.
NS |
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
In |
`in` |
IN operator |
@@ -7230,32 +7324,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
InitCap |
`initcap` |
Returns str with the first letter of each word in uppercase. All other letters are in lowercase |
@@ -7496,6 +7564,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
IsNotNull |
`isnotnull` |
Checks if a value is not null |
@@ -7637,32 +7731,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
KnownFloatingPointNormalized |
|
Tag to prevent redundant normalization |
@@ -7914,6 +7982,32 @@ are limited.
NS |
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
LastDay |
`last_day` |
Returns the last day of the month which the date belongs to |
@@ -8050,32 +8144,6 @@ are limited.
NS |
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
Least |
`least` |
Returns the least value of all parameters, skipping null values |
@@ -8302,6 +8370,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
LessThanOrEqual |
`<=` |
<= operator |
@@ -8434,32 +8528,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
Like |
`like` |
Like |
@@ -8670,6 +8738,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
Log1p |
`log1p` |
Natural log 1 + expr |
@@ -8832,32 +8926,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
Lower |
`lower`, `lcase` |
String lowercase operator |
@@ -9046,6 +9114,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
MapFilter |
`map_filter` |
Filters entries in a map using the function |
@@ -9208,32 +9302,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
Md5 |
`md5` |
MD5 hash operator |
@@ -9533,6 +9601,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
Murmur3Hash |
`hash` |
Murmur3 hash operator |
@@ -9580,32 +9674,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
NaNvl |
`nanvl` |
Evaluates to `left` iff left is not NaN, `right` otherwise |
@@ -9905,6 +9973,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
Or |
`or` |
Logical OR |
@@ -10037,32 +10131,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
PercentRank |
`percent_rank` |
Window function that returns the percent rank value within the aggregation window |
@@ -10126,7 +10194,7 @@ are limited.
|
|
|
-S |
+PS decimals with precision 38 are not supported |
|
|
|
@@ -10357,6 +10425,32 @@ are limited.
|
+Expression |
+SQL Functions(s) |
+Description |
+Notes |
+Context |
+Param/Output |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
PreciseTimestampConversion |
|
Expression used internally to convert the TimestampType to Long and back without losing precision, i.e. in microseconds. Used in time windowing |
@@ -10404,32 +10498,6 @@ are limited.
|
-Expression |
-SQL Functions(s) |
-Description |
-Notes |
-Context |
-Param/Output |
-BOOLEAN |
-BYTE |
-SHORT |
-INT |
-LONG |
-FLOAT |
-DOUBLE |
-DATE |
-TIMESTAMP |
-STRING |
-DECIMAL |
-NULL |
-BINARY |
-CALENDAR |
-ARRAY |
-MAP |
-STRUCT |
-UDT |
-
-
PromotePrecision |
|
PromotePrecision before arithmetic operations between DecimalType data |
@@ -16486,8 +16554,8 @@ are limited.
S |
S |
S |
-PS Input must not contain NaNs and spark.rapids.sql.hasNans must be false. |
-PS Input must not contain NaNs and spark.rapids.sql.hasNans must be false. |
+S |
+S |
S |
PS UTC is only supported TZ for TIMESTAMP |
S |
@@ -16529,8 +16597,8 @@ are limited.
S |
S |
S |
-PS Input must not contain NaNs and spark.rapids.sql.hasNans must be false. |
-PS Input must not contain NaNs and spark.rapids.sql.hasNans must be false. |
+S |
+S |
S |
PS UTC is only supported TZ for TIMESTAMP |
S |
@@ -16572,8 +16640,8 @@ are limited.
S |
S |
S |
-PS Input must not contain NaNs and spark.rapids.sql.hasNans must be false. |
-PS Input must not contain NaNs and spark.rapids.sql.hasNans must be false. |
+S |
+S |
S |
PS UTC is only supported TZ for TIMESTAMP |
S |
diff --git a/integration_tests/README.md b/integration_tests/README.md
index c1151a7d02e..7da0ec89ca9 100644
--- a/integration_tests/README.md
+++ b/integration_tests/README.md
@@ -37,20 +37,20 @@ It is recommended that you use `pyenv` to manage Python installations.
- Follow instructions to use the right method of installation described
[here](https://github.com/pyenv/pyenv#installation)
- Verify that `pyenv` is set correctly
-
+
```shell script
- which pyenv
+ which pyenv
```
- Using `pyenv` to set Python installation
- To check versions to be installed (will return a long list)
-
+
```shell script
ls ~/.pyenv/versions/
```
- To install a specific version from the available list
-
+
```shell script
pyenv install 3.X.Y
```
@@ -116,7 +116,7 @@ You can install all the dependencies using `pip` by running the following comman
### Installing Spark
You need to install spark-3.x and set `$SPARK_HOME/bin` to your `$PATH`, where
-`SPARK_HOME` points to the directory of a runnable Spark distribution.
+`SPARK_HOME` points to the directory of a runnable Spark distribution.
This can be done in the following three steps:
1. Choose the appropriate way to create Spark distribution:
@@ -156,10 +156,10 @@ Make sure that you compile the plugin against the same version of Spark that it
Tests will run as a part of the maven build if you have the environment variable `SPARK_HOME` set.
The suggested way to run these tests is to use the shell-script file located in the
- integration_tests folder called [run_pyspark_from_build.sh](run_pyspark_from_build.sh). This script takes
-care of some of the flags that are required to run the tests which will have to be set for the
-plugin to work. It will be very useful to read the contents of the
-[run_pyspark_from_build.sh](run_pyspark_from_build.sh) to get a better insight
+ integration_tests folder called [run_pyspark_from_build.sh](run_pyspark_from_build.sh). This script takes
+care of some of the flags that are required to run the tests which will have to be set for the
+plugin to work. It will be very useful to read the contents of the
+[run_pyspark_from_build.sh](run_pyspark_from_build.sh) to get a better insight
into what is needed as we constantly keep working on to improve and expand the plugin-support.
The python tests run with pytest and the script honors pytest parameters. Some handy flags are:
@@ -221,16 +221,18 @@ To run the tests separate from the build go to the `integration_tests` directory
`runtests.py` through `spark-submit`, but if you want to run the tests in parallel with
`pytest-xdist` you will need to submit it as a regular python application and have `findspark`
installed. Be sure to include the necessary jars for the RAPIDS plugin either with
-`spark-submit` or with the cluster when it is
+`spark-submit` or with the cluster when it is
[setup](../docs/get-started/getting-started-on-prem.md).
-The command line arguments to `runtests.py` are the same as for
+The command line arguments to `runtests.py` are the same as for
[pytest](https://docs.pytest.org/en/latest/usage.html). The only reason we have a separate script
is that `spark-submit` uses python if the file name ends with `.py`.
If you want to configure the Spark cluster you may also set environment variables for the tests.
The name of the env var should be in the form `"PYSP_TEST_" + conf_key.replace('.', '_')`. Linux
-does not allow '.' in the name of an environment variable so we replace it with an underscore. As
-Spark configs avoid this character we have no other special processing.
+does not allow '.' in the name of an environment variable so we replace it with an underscore. If
+the property contains an underscore, substitute '__' for each original '_'.
+For example, `spark.sql.catalog.spark_catalog` is represented by the environment variable
+`PYSP_TEST_spark_sql_catalog_spark__catalog`.
We also have a large number of integration tests that currently run as a part of the unit tests
using scala test. Those are in the `src/test/scala` sub-directory and depend on the testing
@@ -252,7 +254,7 @@ It is recommended that you use `spark-shell` and the scalatest shell to run each
individually, so you don't risk running unit tests along with the integration tests.
http://www.scalatest.org/user_guide/using_the_scalatest_shell
-```shell
+```shell
spark-shell --jars rapids-4-spark-tests_2.12-22.10.0-SNAPSHOT-tests.jar,rapids-4-spark-integration-tests_2.12-22.10.0-SNAPSHOT-tests.jar,scalatest_2.12-3.0.5.jar,scalactic_2.12-3.0.5.jar
```
@@ -366,8 +368,8 @@ cudf_udf tests needs a couple of different settings, they may need to run separa
To enable cudf_udf tests, need following pre requirements:
* Install cuDF Python library on all the nodes running executors. The instruction could be found at [here](https://rapids.ai/start.html). Please follow the steps to choose the version based on your environment and install the cuDF library via Conda or use other ways like building from source.
* Disable the GPU exclusive mode on all the nodes running executors. The sample command is `sudo nvidia-smi -c DEFAULT`
-
-To run cudf_udf tests, need following configuration changes:
+
+To run cudf_udf tests, need following configuration changes:
* Add configurations `--py-files` and `spark.executorEnv.PYTHONPATH` to specify the plugin jar for python modules 'rapids/daemon' 'rapids/worker'.
* Decrease `spark.rapids.memory.gpu.allocFraction` to reserve enough GPU memory for Python processes in case of out-of-memory.
* Add `spark.rapids.python.concurrentPythonWorkers` and `spark.rapids.python.memory.gpu.allocFraction` to reserve enough GPU memory for Python processes in case of out-of-memory.
@@ -380,7 +382,7 @@ $SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-22.10.0-SNAPSHOT-cuda11
### Enabling fuzz tests
-Fuzz tests are intended to find more corner cases in testing. We disable them by default because they might randomly fail.
+Fuzz tests are intended to find more corner cases in testing. We disable them by default because they might randomly fail.
The tests can be enabled by appending the option `--fuzz_test` to the command.
* `--fuzz_test` (enable the fuzz tests when provided, and remove this option if you want to disable the tests)
@@ -459,33 +461,33 @@ When support for a new operator is added to the Rapids Accelerator for Spark, or
to support more data types, it is recommended that the following conditions be covered in its corresponding integration tests:
### 1. Cover all supported data types
-Ensure that tests cover all data types supported by the added operation. An exhaustive list of data types supported in
+Ensure that tests cover all data types supported by the added operation. An exhaustive list of data types supported in
Apache Spark is available [here](https://spark.apache.org/docs/latest/sql-ref-datatypes.html). These include:
- * Numeric Types
- * `ByteType`
- * `ShortType`
+ * Numeric Types
+ * `ByteType`
+ * `ShortType`
* `IntegerType`
* `LongType`
* `FloatType`
* `DoubleType`
* `DecimalType`
- * Strings
- * `StringType`
+ * Strings
+ * `StringType`
* `VarcharType`
- * Binary (`BinaryType`)
+ * Binary (`BinaryType`)
* Booleans (`BooleanType`)
- * Chrono Types
- * `TimestampType`
+ * Chrono Types
+ * `TimestampType`
* `DateType`
* `Interval`
- * Complex Types
+ * Complex Types
* `ArrayType`
* `StructType`
* `MapType`
`data_gen.py` provides `DataGen` classes that help generate test data in integration tests.
-The `assert_gpu_and_cpu_are_equal_collect()` function from `asserts.py` may be used to compare that an operator in
+The `assert_gpu_and_cpu_are_equal_collect()` function from `asserts.py` may be used to compare that an operator in
the Rapids Accelerator produces the same results as Apache Spark, for a test query.
For data types that are not currently supported for an operator in the Rapids Accelerator,
@@ -505,17 +507,17 @@ E.g.
The `ArrayGen` and `StructGen` classes in `data_gen.py` can be configured to support arbitrary nesting.
### 3. Literal (i.e. Scalar) values
-Operators and expressions that support literal operands need to be tested with literal inputs, of all
-supported types from 1 and 2, above.
+Operators and expressions that support literal operands need to be tested with literal inputs, of all
+supported types from 1 and 2, above.
For instance, `SUM()` supports numeric columns (e.g. `SUM(a + b)`), or scalars (e.g. `SUM(20)`).
Similarly, `COUNT()` supports the following:
* Columns: E.g. `COUNT(a)` to count non-null rows for column `a`
* Scalars: E.g. `COUNT(1)` to count all rows (including nulls)
* `*`: E.g. `COUNT(*)`, functionally equivalent to `COUNT(1)`
It is advised that tests be added for all applicable literal types, for an operator.
-
+
Note that for most operations, if all inputs are literal values, the Spark Catalyst optimizer will evaluate
-the expression during the logical planning phase of query compilation, via
+the expression during the logical planning phase of query compilation, via
[Constant Folding](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-ConstantFolding.html)
E.g. Consider this query:
```sql
@@ -529,18 +531,18 @@ need not necessarily add more test coverage.
Ensure that the test data accommodates null values for input columns. This includes null values in columns
and in literal inputs.
-Null values in input columns are a frequent source of bugs in the Rapids Accelerator for Spark,
-because of mismatches in null-handling and semantics, between RAPIDS `libcudf` (on which
-the Rapids Accelerator relies heavily), and Apache Spark.
+Null values in input columns are a frequent source of bugs in the Rapids Accelerator for Spark,
+because of mismatches in null-handling and semantics, between RAPIDS `libcudf` (on which
+the Rapids Accelerator relies heavily), and Apache Spark.
-Tests for aggregations (including group-by, reductions, and window aggregations) should cover cases where
+Tests for aggregations (including group-by, reductions, and window aggregations) should cover cases where
some rows are null, and where *all* input rows are null.
Apart from null rows in columns of primitive types, the following conditions must be covered for nested types:
* Null rows at the "top" level for `Array`/`Struct` columns. E.g. `[ [1,2], [3], ∅, [4,5,6] ]`.
* Non-null rows containing null elements in the child column. E.g. `[ [1,2], [3,∅], ∅, [4,∅,6] ]`.
- * All null rows at a nested level. E.g.
+ * All null rows at a nested level. E.g.
* All null list rows: `[ ∅, ∅, ∅, ∅ ]`
* All null elements within list rows: `[ [∅,∅], [∅,∅], [∅,∅], [∅,∅] ]`
@@ -579,10 +581,10 @@ describes this with examples. Operations should be tested with multiple bit-repr
The `FloatGen` and `DoubleGen` data generators in `integration_tests/src/main/python/data_gen.py` can be configured
to generate the special float/double values mentioned above.
-For most basic floating-point operations like addition, subtraction, multiplication, and division the plugin will
+For most basic floating-point operations like addition, subtraction, multiplication, and division the plugin will
produce a bit for bit identical result as Spark does. For some other functions (like `sin`, `cos`, etc.), the output may
differ slightly, but remain within the rounding error inherent in floating-point calculations. Certain aggregations
-might compound those differences. In those cases, the `@approximate_float` test annotation may be used to mark tests
+might compound those differences. In those cases, the `@approximate_float` test annotation may be used to mark tests
to use "approximate" comparisons for floating-point values.
Refer to the "Floating Point" section of [compatibility.md](../docs/compatibility.md) for details.
@@ -590,11 +592,11 @@ Refer to the "Floating Point" section of [compatibility.md](../docs/compatibilit
### 8. Special values in timestamp columns
Ensure date/timestamp columns include dates before the [epoch](https://en.wikipedia.org/wiki/Epoch_(computing)).
-Apache Spark supports dates/timestamps between `0001-01-01 00:00:00.000000` and `9999-12-31 23:59:59.999999`, but at
+Apache Spark supports dates/timestamps between `0001-01-01 00:00:00.000000` and `9999-12-31 23:59:59.999999`, but at
values close to the minimum value, the format used in Apache Spark causes rounding errors. To avoid such problems,
it is recommended that the minimum value used in a test not actually equal `0001-01-01`. For instance, `0001-01-03` is
acceptable.
-It is advised that `DateGen` and `TimestampGen` classes from `data_gen.py` be used to generate valid
-(proleptic Gregorian calendar) dates when testing operators that work on dates. This data generator respects
+It is advised that `DateGen` and `TimestampGen` classes from `data_gen.py` be used to generate valid
+(proleptic Gregorian calendar) dates when testing operators that work on dates. This data generator respects
the valid boundaries for dates and timestamps.
diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh
index 3ac660c5c45..d3b9adacb57 100755
--- a/integration_tests/run_pyspark_from_build.sh
+++ b/integration_tests/run_pyspark_from_build.sh
@@ -25,6 +25,7 @@ then
>&2 echo "SPARK_HOME IS NOT SET CANNOT RUN PYTHON INTEGRATION TESTS..."
else
echo "WILL RUN TESTS WITH SPARK_HOME: ${SPARK_HOME}"
+ [[ ! -x "$(command -v zip)" ]] && { echo "fail to find zip command in $PATH"; exit 1; }
# Spark 3.1.1 includes https://github.com/apache/spark/pull/31540
# which helps with spurious task failures as observed in our tests. If you are running
# Spark versions before 3.1.1, this sets the spark.max.taskFailures to 4 to allow for
@@ -258,7 +259,14 @@ else
# If you want to change the amount of GPU memory allocated you have to change it here
# and where TEST_PARALLEL is calculated
- export PYSP_TEST_spark_rapids_memory_gpu_allocSize='1536m'
+ if [[ -n "${PYSP_TEST_spark_rapids_memory_gpu_allocSize}" ]]; then
+ >&2 echo "#### WARNING: using externally set" \
+ "PYSP_TEST_spark_rapids_memory_gpu_allocSize" \
+ "${PYSP_TEST_spark_rapids_memory_gpu_allocSize}." \
+ "If needed permanently in CI please file an issue to accommodate" \
+ "for new GPU memory requirements ####"
+ fi
+ export PYSP_TEST_spark_rapids_memory_gpu_allocSize=${PYSP_TEST_spark_rapids_memory_gpu_allocSize:-'1536m'}
if ((${#TEST_PARALLEL_OPTS[@]} > 0));
then
@@ -266,11 +274,17 @@ else
else
# We set the GPU memory size to be a constant value even if only running with a parallelism of 1
# because it helps us have consistent test runs.
+ jarOpts=()
if [[ -n "$PYSP_TEST_spark_jars" ]]; then
- # `spark.jars` is the same as `--jars`, e.g.: --jars a.jar,b.jar...
- jarOpts=(--conf spark.jars="${PYSP_TEST_spark_jars}")
- elif [[ -n "$PYSP_TEST_spark_driver_extraClassPath" ]]; then
- jarOpts=(--driver-class-path "${PYSP_TEST_spark_driver_extraClassPath}")
+ jarOpts+=(--jars "${PYSP_TEST_spark_jars}")
+ fi
+
+ if [[ -n "$PYSP_TEST_spark_jars_packages" ]]; then
+ jarOpts+=(--packages "${PYSP_TEST_spark_jars_packages}")
+ fi
+
+ if [[ -n "$PYSP_TEST_spark_driver_extraClassPath" ]]; then
+ jarOpts+=(--driver-class-path "${PYSP_TEST_spark_driver_extraClassPath}")
fi
driverJavaOpts="$PYSP_TEST_spark_driver_extraJavaOptions"
@@ -281,6 +295,7 @@ else
unset PYSP_TEST_spark_driver_extraClassPath
unset PYSP_TEST_spark_driver_extraJavaOptions
unset PYSP_TEST_spark_jars
+ unset PYSP_TEST_spark_jars_packages
unset PYSP_TEST_spark_rapids_memory_gpu_allocSize
exec "$SPARK_HOME"/bin/spark-submit "${jarOpts[@]}" \
diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py
index f86f6294180..9e2e3e06a38 100644
--- a/integration_tests/src/main/python/arithmetic_ops_test.py
+++ b/integration_tests/src/main/python/arithmetic_ops_test.py
@@ -27,12 +27,17 @@
# No overflow gens here because we just focus on verifying the fallback to CPU when
# enabling ANSI mode. But overflows will fail the tests because CPU runs raise
# exceptions.
-_no_overflow_multiply_gens = [
+_no_overflow_multiply_gens_for_fallback = [
ByteGen(min_val = 1, max_val = 10, special_cases=[]),
ShortGen(min_val = 1, max_val = 100, special_cases=[]),
IntegerGen(min_val = 1, max_val = 1000, special_cases=[]),
LongGen(min_val = 1, max_val = 3000, special_cases=[])]
+
+_no_overflow_multiply_gens = _no_overflow_multiply_gens_for_fallback + [
+ DecimalGen(10, 0),
+ DecimalGen(19, 0)]
+
_decimal_gen_7_7 = DecimalGen(precision=7, scale=7)
_decimal_gen_18_0 = DecimalGen(precision=18, scale=0)
_decimal_gen_18_3 = DecimalGen(precision=18, scale=3)
@@ -43,11 +48,14 @@
_decimal_gen_38_10 = DecimalGen(precision=38, scale=10)
_decimal_gen_38_neg10 = DecimalGen(precision=38, scale=-10)
-_arith_data_gens_diff_precision_scale_and_no_neg_scale = [
+_arith_data_gens_diff_precision_scale_and_no_neg_scale_no_38_0 = [
decimal_gen_32bit, decimal_gen_64bit, _decimal_gen_18_0, decimal_gen_128bit,
- _decimal_gen_30_2, _decimal_gen_36_5, _decimal_gen_38_0, _decimal_gen_38_10
+ _decimal_gen_30_2, _decimal_gen_36_5, _decimal_gen_38_10
]
+_arith_data_gens_diff_precision_scale_and_no_neg_scale = \
+ _arith_data_gens_diff_precision_scale_and_no_neg_scale_no_38_0 + [_decimal_gen_38_0]
+
_arith_decimal_gens_no_neg_scale = _arith_data_gens_diff_precision_scale_and_no_neg_scale + [_decimal_gen_7_7]
_arith_decimal_gens = _arith_decimal_gens_no_neg_scale + [
@@ -58,6 +66,12 @@
_arith_data_gens_no_neg_scale = numeric_gens + _arith_decimal_gens_no_neg_scale
+_arith_decimal_gens_no_neg_scale_38_0_overflow = \
+ _arith_data_gens_diff_precision_scale_and_no_neg_scale_no_38_0 + [
+ _decimal_gen_7_7,
+ pytest.param(_decimal_gen_38_0, marks=pytest.mark.skipif(
+ is_spark_330_or_later(), reason='This case overflows in Spark 3.3.0+'))]
+
def _get_overflow_df(spark, data, data_type, expr):
return spark.createDataFrame(
SparkContext.getOrCreate().parallelize([data]),
@@ -114,19 +128,24 @@ def test_subtraction_ansi_no_overflow(data_gen):
@pytest.mark.parametrize('data_gen', numeric_gens + [
decimal_gen_32bit_neg_scale, decimal_gen_32bit, _decimal_gen_7_7,
- DecimalGen(precision=8, scale=8), decimal_gen_64bit, _decimal_gen_18_3], ids=idfn)
+ DecimalGen(precision=8, scale=8), decimal_gen_64bit, _decimal_gen_18_3,
+ _decimal_gen_38_10,
+ _decimal_gen_38_neg10
+ ], ids=idfn)
def test_multiplication(data_gen):
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
+ f.col('a'), f.col('b'),
f.col('a') * f.lit(100).cast(data_type),
f.lit(-12).cast(data_type) * f.col('b'),
f.lit(None).cast(data_type) * f.col('a'),
f.col('b') * f.lit(None).cast(data_type),
- f.col('a') * f.col('b')))
+ f.col('a') * f.col('b')
+ ))
@allow_non_gpu('ProjectExec', 'Alias', 'Multiply', 'Cast')
-@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn)
+@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens_for_fallback, ids=idfn)
def test_multiplication_fallback_when_ansi_enabled(data_gen):
assert_gpu_fallback_collect(
lambda spark : binary_op_df(spark, data_gen).select(
@@ -134,7 +153,7 @@ def test_multiplication_fallback_when_ansi_enabled(data_gen):
'Multiply',
conf=ansi_enabled_conf)
-@pytest.mark.parametrize('data_gen', [float_gen, double_gen, decimal_gen_32bit], ids=idfn)
+@pytest.mark.parametrize('data_gen', [float_gen, double_gen, decimal_gen_32bit, DecimalGen(19, 0)], ids=idfn)
def test_multiplication_ansi_enabled(data_gen):
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
@@ -143,8 +162,18 @@ def test_multiplication_ansi_enabled(data_gen):
f.col('a') * f.col('b')),
conf=ansi_enabled_conf)
-@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 5), DecimalGen(6, 4), DecimalGen(5, 4), DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(3, -2), DecimalGen(16, 7)], ids=idfn)
-@pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3)], ids=idfn)
+def test_multiplication_ansi_overflow():
+ exception_str = 'ArithmeticException'
+ assert_gpu_and_cpu_error(
+ lambda spark : unary_op_df(spark, DecimalGen(38, 0)).selectExpr("a * " + "9"*38 + " as ret").collect(),
+ ansi_enabled_conf,
+ exception_str)
+
+@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 5), DecimalGen(6, 4), DecimalGen(5, 4), DecimalGen(5, 3),
+ DecimalGen(4, 2), DecimalGen(3, -2), DecimalGen(16, 7), DecimalGen(19, 0),
+ DecimalGen(30, 10)], ids=idfn)
+@pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3),
+ DecimalGen(30, 12), DecimalGen(3, -3), DecimalGen(27, 7), DecimalGen(20, -3)], ids=idfn)
def test_multiplication_mixed(lhs, rhs):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, lhs, rhs).select(
@@ -161,7 +190,7 @@ def test_float_multiplication_mixed(lhs, rhs):
@pytest.mark.parametrize('data_gen', [double_gen, decimal_gen_32bit_neg_scale, DecimalGen(6, 3),
DecimalGen(5, 5), DecimalGen(6, 0), DecimalGen(7, 4), DecimalGen(15, 0), DecimalGen(18, 0),
- DecimalGen(17, 2), DecimalGen(16, 4)], ids=idfn)
+ DecimalGen(17, 2), DecimalGen(16, 4), DecimalGen(38, 21), DecimalGen(21, 17), DecimalGen(3, -2)], ids=idfn)
def test_division(data_gen):
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
@@ -172,21 +201,33 @@ def test_division(data_gen):
f.col('b') / f.lit(None).cast(data_type),
f.col('a') / f.col('b')))
-@allow_non_gpu('ProjectExec', 'Alias', 'Divide', 'Cast', 'PromotePrecision', 'CheckOverflow')
-@pytest.mark.parametrize('data_gen', [DecimalGen(38, 21), DecimalGen(21, 17)], ids=idfn)
-def test_division_fallback_on_decimal(data_gen):
- assert_gpu_fallback_collect(
- lambda spark : binary_op_df(spark, data_gen).select(
- f.col('a') / f.col('b')),
- 'Divide')
-
@pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(4, 1), DecimalGen(5, 0), DecimalGen(5, 1), DecimalGen(10, 5)], ids=idfn)
@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn)
def test_division_mixed(lhs, rhs):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, lhs, rhs).select(
+ f.col('a'), f.col('b'),
f.col('a') / f.col('b')))
+# Spark has some problems with some decimal operations where it can try to generate a type that is invalid (scale > precision) which results in an error
+# instead of increasing the precision. So we have a second test that deals with a few of these use cases
+@pytest.mark.parametrize('rhs', [DecimalGen(30, 10), DecimalGen(28, 18)], ids=idfn)
+@pytest.mark.parametrize('lhs', [DecimalGen(27, 7), DecimalGen(20, -3)], ids=idfn)
+def test_division_mixed_larger_dec(lhs, rhs):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : two_col_df(spark, lhs, rhs).select(
+ f.col('a'), f.col('b'),
+ f.col('a') / f.col('b')))
+
+def test_special_decimal_division():
+ for precision in range(1, 39):
+ for scale in range(-3, precision + 1):
+ print("PRECISION " + str(precision) + " SCALE " + str(scale))
+ data_gen = DecimalGen(precision, scale)
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : two_col_df(spark, data_gen, data_gen).select(
+ f.col('a') / f.col('b')))
+
@approximate_float # we should get the perfectly correct answer for floats except when casting a decimal to a float in some corner cases.
@pytest.mark.parametrize('rhs', [float_gen, double_gen], ids=idfn)
@pytest.mark.parametrize('lhs', [DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn)
@@ -196,38 +237,6 @@ def test_float_division_mixed(lhs, rhs):
f.col('a') / f.col('b')),
conf={'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})
-@ignore_order
-@pytest.mark.parametrize('rhs,rhs_type', [
- (DecimalGen(15, 3), DecimalType(30, 10)),
- (DecimalGen(10, 2), DecimalType(28, 18))], ids=idfn)
-@pytest.mark.parametrize('lhs,lhs_type', [
- (DecimalGen(15, 3), DecimalType(27, 7)),
- (DecimalGen(3, -3), DecimalType(20, -3))], ids=idfn)
-def test_decimal_division_mixed_no_overflow_guarantees(lhs, lhs_type, rhs, rhs_type):
- assert_gpu_and_cpu_are_equal_collect(
- lambda spark : two_col_df(spark, lhs, rhs)\
- .withColumn('lhs', f.col('a').cast(lhs_type))\
- .withColumn('rhs', f.col('b').cast(rhs_type))\
- .repartition(1)\
- .select(f.col('lhs'), f.col('rhs'), f.col('lhs') / f.col('rhs')),
- conf={'spark.rapids.sql.decimalOverflowGuarantees': 'false'})
-
-@ignore_order
-@pytest.mark.parametrize('rhs,rhs_type', [
- (DecimalGen(15, 3), DecimalType(30, 10)),
- (DecimalGen(10, 2), DecimalType(28, 9))], ids=idfn)
-@pytest.mark.parametrize('lhs,lhs_type', [
- (DecimalGen(10, 3), DecimalType(27, 7)),
- (DecimalGen(3, -3), DecimalType(20, -3))], ids=idfn)
-def test_decimal_multiplication_mixed_no_overflow_guarantees(lhs, lhs_type, rhs, rhs_type):
- assert_gpu_and_cpu_are_equal_collect(
- lambda spark : two_col_df(spark, lhs, rhs)\
- .withColumn('lhs', f.col('a').cast(lhs_type))\
- .withColumn('rhs', f.col('b').cast(rhs_type))\
- .repartition(1)\
- .select(f.col('lhs'), f.col('rhs'), f.col('lhs') * f.col('rhs')),
- conf={'spark.rapids.sql.decimalOverflowGuarantees': 'false'})
-
@pytest.mark.parametrize('data_gen', integral_gens + [
decimal_gen_32bit, decimal_gen_64bit, _decimal_gen_7_7, _decimal_gen_18_3, _decimal_gen_30_2,
_decimal_gen_36_5, _decimal_gen_38_0], ids=idfn)
@@ -259,7 +268,14 @@ def test_mod(data_gen):
f.col('b') % f.lit(None).cast(data_type),
f.col('a') % f.col('b')))
-@pytest.mark.parametrize('data_gen', _arith_data_gens_no_neg_scale, ids=idfn)
+# pmod currently falls back for Decimal(precision=38)
+# https://github.com/NVIDIA/spark-rapids/issues/6336
+_pmod_gens = numeric_gens + [ decimal_gen_32bit, decimal_gen_64bit, _decimal_gen_18_0, decimal_gen_128bit,
+ _decimal_gen_30_2, _decimal_gen_36_5,
+ DecimalGen(precision=37, scale=0), DecimalGen(precision=37, scale=10),
+ _decimal_gen_7_7]
+
+@pytest.mark.parametrize('data_gen', _pmod_gens, ids=idfn)
def test_pmod(data_gen):
string_type = to_cast_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
@@ -270,6 +286,19 @@ def test_pmod(data_gen):
'pmod(b, cast(null as {}))'.format(string_type),
'pmod(a, b)'))
+@allow_non_gpu("ProjectExec", "Pmod")
+@pytest.mark.parametrize('data_gen', [_decimal_gen_38_0, _decimal_gen_38_10], ids=idfn)
+def test_pmod_fallback(data_gen):
+ string_type = to_cast_string(data_gen.data_type)
+ assert_gpu_fallback_collect(
+ lambda spark : binary_op_df(spark, data_gen).selectExpr(
+ 'pmod(a, cast(100 as {}))'.format(string_type),
+ 'pmod(cast(-12 as {}), b)'.format(string_type),
+ 'pmod(cast(null as {}), a)'.format(string_type),
+ 'pmod(b, cast(null as {}))'.format(string_type),
+ 'pmod(a, b)'),
+ "Pmod")
+
# test pmod(Long.MinValue, -1) = 0 and Long.MinValue % -1 = 0, should not throw
def test_mod_pmod_long_min_value():
assert_gpu_and_cpu_are_equal_collect(
@@ -278,7 +307,10 @@ def test_mod_pmod_long_min_value():
'a % -1L'),
ansi_enabled_conf)
-@pytest.mark.parametrize('data_gen', _arith_data_gens_diff_precision_scale_and_no_neg_scale, ids=idfn)
+# pmod currently falls back for Decimal(precision=38)
+# https://github.com/NVIDIA/spark-rapids/issues/6336
+@pytest.mark.parametrize('data_gen', [decimal_gen_32bit, decimal_gen_64bit, _decimal_gen_18_0,
+ decimal_gen_128bit, _decimal_gen_30_2, _decimal_gen_36_5], ids=idfn)
@pytest.mark.parametrize('overflow_exp', [
'pmod(a, cast(0 as {}))',
'pmod(cast(-12 as {}), cast(0 as {}))',
@@ -314,7 +346,7 @@ def test_cast_neg_to_decimal_err():
ansi_enabled_conf,
exception_type + exception_content)
-@pytest.mark.parametrize('data_gen', _arith_data_gens_no_neg_scale, ids=idfn)
+@pytest.mark.parametrize('data_gen', _pmod_gens, ids=idfn)
def test_mod_pmod_by_zero_not_ansi(data_gen):
string_type = to_cast_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
@@ -431,7 +463,7 @@ def test_floor_scale_zero(data_gen):
@pytest.mark.skipif(is_before_spark_330(), reason='scale parameter in Floor function is not supported before Spark 3.3.0')
@allow_non_gpu('ProjectExec')
-@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
+@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale_38_0_overflow, ids=idfn)
def test_floor_scale_nonzero(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a, -1)'), 'RoundFloor')
@@ -502,7 +534,7 @@ def test_shift_right_unsigned(data_gen):
'shiftrightunsigned(a, cast(null as INT))',
'shiftrightunsigned(a, b)'))
-_arith_data_gens_for_round = numeric_gens + _arith_decimal_gens_no_neg_scale + [
+_arith_data_gens_for_round = numeric_gens + _arith_decimal_gens_no_neg_scale_38_0_overflow + [
decimal_gen_32bit_neg_scale,
DecimalGen(precision=15, scale=-8),
DecimalGen(precision=30, scale=-5),
@@ -1050,16 +1082,15 @@ def test_unary_positive_day_time_interval():
lambda spark: unary_op_df(spark, DayTimeIntervalGen()).selectExpr('+a'))
@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
-@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [DoubleGen(min_exp=-3, max_exp=5, special_cases=[0.0])], ids=idfn)
+@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens_for_fallback + [DoubleGen(min_exp=-3, max_exp=5, special_cases=[0.0])], ids=idfn)
def test_day_time_interval_multiply_number(data_gen):
gen_list = [('_c1', DayTimeIntervalGen(min_value=timedelta(seconds=-20 * 86400), max_value=timedelta(seconds=20 * 86400))),
('_c2', data_gen)]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, gen_list).selectExpr("_c1 * _c2"))
-
@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
-@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [DoubleGen(min_exp=0, max_exp=5, special_cases=[])], ids=idfn)
+@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens_for_fallback + [DoubleGen(min_exp=0, max_exp=5, special_cases=[])], ids=idfn)
def test_day_time_interval_division_number_no_overflow1(data_gen):
gen_list = [('_c1', DayTimeIntervalGen(min_value=timedelta(seconds=-5000 * 365 * 86400), max_value=timedelta(seconds=5000 * 365 * 86400))),
('_c2', data_gen)]
@@ -1068,7 +1099,7 @@ def test_day_time_interval_division_number_no_overflow1(data_gen):
lambda spark: gen_df(spark, gen_list).selectExpr("_c1 / case when _c2 = 0 then cast(1 as {}) else _c2 end".format(to_cast_string(data_gen.data_type))))
@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0')
-@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [DoubleGen(min_exp=-5, max_exp=0, special_cases=[])], ids=idfn)
+@pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens_for_fallback + [DoubleGen(min_exp=-5, max_exp=0, special_cases=[])], ids=idfn)
def test_day_time_interval_division_number_no_overflow2(data_gen):
gen_list = [('_c1', DayTimeIntervalGen(min_value=timedelta(seconds=-20 * 86400), max_value=timedelta(seconds=20 * 86400))),
('_c2', data_gen)]
diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py
index cff9d4e7a07..8e37fb61d1c 100644
--- a/integration_tests/src/main/python/cache_test.py
+++ b/integration_tests/src/main/python/cache_test.py
@@ -26,7 +26,9 @@
enable_vectorized_confs = [{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "true"},
{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "false"}]
-_cache_decimal_gens = [decimal_gen_32bit, decimal_gen_64bit, decimal_gen_128bit]
+# Many tests sort the results, so use a sortable decimal generator as many Spark versions
+# fail to sort some large decimals properly.
+_cache_decimal_gens = [decimal_gen_32bit, decimal_gen_64bit, orderable_decimal_gen_128bit]
_cache_single_array_gens_no_null = [ArrayGen(gen) for gen in all_basic_gens_no_null + _cache_decimal_gens]
decimal_struct_gen= StructGen([['child0', sub_gen] for ind, sub_gen in enumerate(_cache_decimal_gens)])
@@ -166,7 +168,7 @@ def n_fold(spark):
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]),
pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]),
BooleanGen(), DateGen(), TimestampGen(), decimal_gen_32bit, decimal_gen_64bit,
- decimal_gen_128bit] + _cache_single_array_gens_no_null_no_timestamp, ids=idfn)
+ orderable_decimal_gen_128bit] + _cache_single_array_gens_no_null_no_timestamp, ids=idfn)
@pytest.mark.parametrize('ts_write', ['TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'])
@pytest.mark.parametrize('enable_vectorized', ['true', 'false'], ids=idfn)
@ignore_order
diff --git a/integration_tests/src/main/python/cast_test.py b/integration_tests/src/main/python/cast_test.py
index 2b15e68b2bf..8c5326c44ec 100644
--- a/integration_tests/src/main/python/cast_test.py
+++ b/integration_tests/src/main/python/cast_test.py
@@ -132,11 +132,15 @@ def test_cast_string_date_non_ansi():
lambda spark: spark.createDataFrame(data_rows, "a string").select(f.col('a').cast(DateType())),
conf={'spark.rapids.sql.hasExtendedYearValues': 'false'})
-def test_cast_string_ts_valid_format():
+@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}'),
+ StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9]'),
+ StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9].[0-9]{0,6}Z?')],
+ ids=idfn)
+def test_cast_string_ts_valid_format(data_gen):
# In Spark 3.2.0+ the valid format changed, and we cannot support all of the format.
# This provides values that are valid in all of those formats.
assert_gpu_and_cpu_are_equal_collect(
- lambda spark : unary_op_df(spark, StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}')).select(f.col('a').cast(TimestampType())),
+ lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(TimestampType())),
conf = {'spark.rapids.sql.hasExtendedYearValues': 'false',
'spark.rapids.sql.castStringToTimestamp.enabled': 'true'})
@@ -228,9 +232,15 @@ def test_cast_long_to_decimal_overflow():
# casting these types to string should be passed
basic_gens_for_cast_to_string = [ByteGen, ShortGen, IntegerGen, LongGen, StringGen, BooleanGen, DateGen, TimestampGen]
basic_array_struct_gens_for_cast_to_string = [f() for f in basic_gens_for_cast_to_string] + [null_gen] + decimal_gens
+
+# We currently do not generate the exact string as Spark for some decimal values of zero
+# https://github.com/NVIDIA/spark-rapids/issues/6339
basic_map_gens_for_cast_to_string = [
MapGen(f(nullable=False), f()) for f in basic_gens_for_cast_to_string] + [
- MapGen(DecimalGen(nullable=False), DecimalGen(precision=7, scale=3)), MapGen(DecimalGen(precision=7, scale=7, nullable=False), DecimalGen(precision=12, scale=2))]
+ MapGen(DecimalGen(nullable=False, special_cases=[]),
+ DecimalGen(precision=7, scale=3, special_cases=[])),
+ MapGen(DecimalGen(precision=7, scale=7, nullable=False, special_cases=[]),
+ DecimalGen(precision=12, scale=2), special_cases=[])]
# GPU does not match CPU to casting these types to string, marked as xfail when testing
not_matched_gens_for_cast_to_string = [FloatGen, DoubleGen]
diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py
index 8b880985aa8..97b3be41ba8 100644
--- a/integration_tests/src/main/python/conftest.py
+++ b/integration_tests/src/main/python/conftest.py
@@ -15,8 +15,15 @@
import os
import pytest
import random
-from spark_init_internal import get_spark_i_know_what_i_am_doing
-from pyspark.sql.dataframe import DataFrame
+
+# TODO redo _spark stuff using fixtures
+#
+# Don't import pyspark / _spark directly in conftest globally
+# import as a plugin to do a lazy per-pytest-session initialization
+#
+pytest_plugins = [
+ 'spark_init_internal'
+]
_approximate_float_args = None
@@ -252,6 +259,7 @@ def get_worker_id(request):
@pytest.fixture
def spark_tmp_path(request):
+ from spark_init_internal import get_spark_i_know_what_i_am_doing
debug = request.config.getoption('debug_tmp_path')
ret = request.config.getoption('tmp_path')
if ret is None:
@@ -282,6 +290,7 @@ def get(self):
@pytest.fixture
def spark_tmp_table_factory(request):
+ from spark_init_internal import get_spark_i_know_what_i_am_doing
worker_id = get_worker_id(request)
table_id = random.getrandbits(31)
base_id = f'tmp_table_{worker_id}_{table_id}'
@@ -300,6 +309,7 @@ def _get_jvm(spark):
return spark.sparkContext._jvm
def spark_jvm():
+ from spark_init_internal import get_spark_i_know_what_i_am_doing
return _get_jvm(get_spark_i_know_what_i_am_doing())
class MortgageRunner:
@@ -309,6 +319,7 @@ def __init__(self, mortgage_format, mortgage_acq_path, mortgage_perf_path):
self.mortgage_perf_path = mortgage_perf_path
def do_test_query(self, spark):
+ from pyspark.sql.dataframe import DataFrame
jvm_session = _get_jvm_session(spark)
jvm = _get_jvm(spark)
acq = self.mortgage_acq_path
@@ -324,7 +335,7 @@ def do_test_query(self, spark):
raise AssertionError('Not Supported Format {}'.format(self.mortgage_format))
return DataFrame(df, spark.getActiveSession())
-
+
@pytest.fixture(scope="session")
def mortgage(request):
mortgage_format = request.config.getoption("mortgage_format")
diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py
index db4b54b501f..6a8997e7167 100644
--- a/integration_tests/src/main/python/data_gen.py
+++ b/integration_tests/src/main/python/data_gen.py
@@ -21,7 +21,7 @@
from pyspark.sql.types import *
import pyspark.sql.functions as f
import random
-from spark_session import is_tz_utc
+from spark_session import is_tz_utc, is_before_spark_340
import sre_yield
import struct
from conftest import skip_unless_precommit_tests
@@ -215,17 +215,19 @@ def start(self, rand):
class DecimalGen(DataGen):
"""Generate Decimals, with some built in corner cases."""
- def __init__(self, precision=None, scale=None, nullable=True, special_cases=[]):
+ def __init__(self, precision=None, scale=None, nullable=True, special_cases=None):
if precision is None:
#Maximum number of decimal digits a Long can represent is 18
precision = 18
scale = 0
DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale))
DECIMAL_MAX = Decimal(('9'* precision) + 'e' + str(-scale))
+ if (special_cases is None):
+ special_cases = [DECIMAL_MIN, DECIMAL_MAX, Decimal('0')]
super().__init__(DecimalType(precision, scale), nullable=nullable, special_cases=special_cases)
self.scale = scale
self.precision = precision
- pattern = "[0-9]{1,"+ str(precision) + "}e" + str(-scale)
+ pattern = "-?[0-9]{1,"+ str(precision) + "}e" + str(-scale)
self.base_strs = sre_yield.AllStrings(pattern, flags=0, charset=sre_yield.CHARSET, max_count=_MAX_CHOICES)
def __repr__(self):
@@ -928,10 +930,18 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
all_basic_gens_no_nan = [byte_gen, short_gen, int_gen, long_gen, FloatGen(no_nans=True), DoubleGen(no_nans=True),
string_gen, boolean_gen, date_gen, timestamp_gen, null_gen]
+# Many Spark versions have issues sorting large decimals,
+# see https://issues.apache.org/jira/browse/SPARK-40089.
+orderable_decimal_gen_128bit = decimal_gen_128bit
+if is_before_spark_340():
+ orderable_decimal_gen_128bit = DecimalGen(precision=20, scale=2, special_cases=[])
+
+orderable_decimal_gens = [decimal_gen_32bit, decimal_gen_64bit, orderable_decimal_gen_128bit ]
+
# TODO add in some array generators to this once that is supported for sorting
# a selection of generators that should be orderable (sortable and compareable)
orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
- string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] + decimal_gens
+ string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] + orderable_decimal_gens
# TODO add in some array generators to this once that is supported for these operations
# a selection of generators that can be compared for equality
diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py
index 722febeb85e..a22ea24ac2b 100644
--- a/integration_tests/src/main/python/date_time_test.py
+++ b/integration_tests/src/main/python/date_time_test.py
@@ -215,6 +215,21 @@ def test_to_unix_timestamp(data_gen, ansi_enabled):
{'spark.sql.ansi.enabled': ansi_enabled})
+@pytest.mark.parametrize('time_zone', ["UTC", "UTC+0", "UTC-0", "GMT", "GMT+0", "GMT-0"], ids=idfn)
+@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
+def test_from_utc_timestamp(data_gen, time_zone):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)))
+
+@allow_non_gpu('ProjectExec, FromUTCTimestamp')
+@pytest.mark.parametrize('time_zone', ["PST", "MST", "EST", "VST", "NST", "AST"], ids=idfn)
+@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
+def test_from_utc_timestamp_fallback(data_gen, time_zone):
+ assert_gpu_fallback_collect(
+ lambda spark : unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
+ 'ProjectExec')
+
+
@pytest.mark.parametrize('invalid,fmt', [
('2021-01/01', 'yyyy-MM-dd'),
('2021/01-01', 'yyyy/MM/dd'),
diff --git a/integration_tests/src/main/python/dpp_test.py b/integration_tests/src/main/python/dpp_test.py
index 124b1cd3b7e..ad73358704c 100644
--- a/integration_tests/src/main/python/dpp_test.py
+++ b/integration_tests/src/main/python/dpp_test.py
@@ -287,7 +287,10 @@ def setup_tables(spark):
" PARTITIONED BY (dt date, hr string, mins string) STORED AS PARQUET")
spark.sql("INSERT INTO {}(id,dt,hr,mins)".format(fact_table) +
" SELECT 'somevalue', to_date('2022-01-01'), '11', '59'")
- with_cpu_session(setup_tables)
+ with_cpu_session(setup_tables, conf={
+ "hive.exec.dynamic.partition" : "true",
+ "hive.exec.dynamic.partition.mode" : "nonstrict"
+ })
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT COUNT(*) AS cnt FROM {} f".format(fact_table) +
" LEFT JOIN (SELECT *, " +
diff --git a/integration_tests/src/main/python/expand_exec_test.py b/integration_tests/src/main/python/expand_exec_test.py
index 8974e313f0a..d60b7859095 100644
--- a/integration_tests/src/main/python/expand_exec_test.py
+++ b/integration_tests/src/main/python/expand_exec_test.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2021, NVIDIA CORPORATION.
+# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +19,9 @@
from marks import ignore_order
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
-@ignore_order
+# Many Spark versions have issues sorting large decimals,
+# see https://issues.apache.org/jira/browse/SPARK-40089.
+@ignore_order(local=True)
def test_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
return gen_df(spark, StructGen([
diff --git a/integration_tests/src/main/python/explain_test.py b/integration_tests/src/main/python/explain_test.py
index 53685b5e7c3..b84754a3d3f 100644
--- a/integration_tests/src/main/python/explain_test.py
+++ b/integration_tests/src/main/python/explain_test.py
@@ -20,6 +20,9 @@
from pyspark.sql.types import *
from spark_session import with_cpu_session, with_gpu_session
+# mark this test as ci_1 for mvn verify sanity check in pre-merge CI
+pytestmark = pytest.mark.premerge_ci_1
+
def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length).withColumnRenamed("a", "r_a")\
diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py
index 90a9884359a..752a461f58f 100644
--- a/integration_tests/src/main/python/hash_aggregate_test.py
+++ b/integration_tests/src/main/python/hash_aggregate_test.py
@@ -417,11 +417,12 @@ def test_hash_reduction_sum_full_decimal(data_gen, conf):
@approximate_float
@ignore_order
@incompat
-@pytest.mark.parametrize('data_gen', _init_list_with_nans_and_no_nans + [_grpkey_short_mid_decimals, _grpkey_short_big_decimals], ids=idfn)
+@pytest.mark.parametrize('data_gen', _init_list_with_nans_and_no_nans + [_grpkey_short_mid_decimals,
+ _grpkey_short_big_decimals, _grpkey_short_very_big_decimals, _grpkey_short_full_decimals], ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
- lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.avg('b')),
+ lambda spark: gen_df(spark, data_gen, length=200).groupby('a').agg(f.avg('b')),
conf=conf
)
@@ -460,10 +461,27 @@ def test_exceptAll(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen, length=100).exceptAll(gen_df(spark, data_gen, length=100).filter('a != b')))
+# Spark fails to sort some decimal values due to overflow when calculating the sorting prefix.
+# See https://issues.apache.org/jira/browse/SPARK-40129
+# Since pivot orders by value, avoid generating these extreme values for this test.
+_pivot_gen_128bit = DecimalGen(precision=20, scale=2, special_cases=[])
+_pivot_big_decimals = [
+ ('a', RepeatSeqGen(DecimalGen(precision=32, scale=10, nullable=(True, 10.0)), length=50)),
+ ('b', _pivot_gen_128bit),
+ ('c', DecimalGen(precision=36, scale=5))]
+_pivot_short_big_decimals = [
+ ('a', RepeatSeqGen(short_gen, length=50)),
+ ('b', _pivot_gen_128bit),
+ ('c', decimal_gen_128bit)]
+
+_pivot_gens_with_decimals = _init_list_with_nans_and_no_nans + [
+ _grpkey_small_decimals, _pivot_big_decimals, _grpkey_short_mid_decimals,
+ _pivot_short_big_decimals, _grpkey_short_very_big_decimals,
+ _grpkey_short_very_big_neg_scale_decimals]
@approximate_float
@ignore_order(local=True)
@incompat
-@pytest.mark.parametrize('data_gen', _init_list_with_nans_and_no_nans_with_decimalbig, ids=idfn)
+@pytest.mark.parametrize('data_gen', _pivot_gens_with_decimals, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs_with_nans, params_markers_for_confs_nans), ids=idfn)
def test_hash_grpby_pivot(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
@@ -604,6 +622,8 @@ def test_hash_pivot_groupby_duplicates_fallback(data_gen):
('a', RepeatSeqGen(LongGen(), length=20)),
('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op_nested + _array_of_array_gen]
+_all_basic_gens_with_all_nans_cases = all_basic_gens + [SetValuesGen(t, [math.nan, None]) for t in [FloatType(), DoubleType()]]
+
# very simple test for just a count on decimals 128 values until we can support more with them
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
@@ -637,18 +657,12 @@ def test_decimal128_min_max_group_by(data_gen):
.agg(f.min('b'), f.max('b')))
@ignore_order(local=True)
-@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn)
-def test_float_max_reduction_with_nan(data_gen):
- assert_gpu_and_cpu_are_equal_collect(
- lambda spark: unary_op_df(spark, data_gen).selectExpr('max(a)'))
-
-@ignore_order(local=True)
-@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn)
-def test_float_max_group_by_with_nan(data_gen):
+@pytest.mark.parametrize('data_gen', _all_basic_gens_with_all_nans_cases, ids=idfn)
+def test_min_max_group_by(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, byte_gen, data_gen)
.groupby('a')
- .agg(f.max('b')))
+ .agg(f.min('b'), f.max('b')))
# to avoid ordering issues with collect_list we do it all in a single task
@ignore_order(local=True)
@@ -1105,7 +1119,7 @@ def test_first_last_reductions_nested_types(data_gen):
lambda spark: unary_op_df(spark, data_gen).coalesce(1).selectExpr(
'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)'))
-@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn)
+@pytest.mark.parametrize('data_gen', _all_basic_gens_with_all_nans_cases, ids=idfn)
def test_generic_reductions(data_gen):
local_conf = copy_and_update(_no_nans_float_conf, {'spark.sql.legacy.allowParameterlessCount': 'true'})
assert_gpu_and_cpu_are_equal_collect(
@@ -1133,7 +1147,7 @@ def test_count(data_gen):
'count(1)'),
conf = {'spark.sql.legacy.allowParameterlessCount': 'true'})
-@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn)
+@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_distinct_count_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
@@ -1155,7 +1169,7 @@ def test_arithmetic_reductions(data_gen):
conf = _no_nans_float_conf)
@pytest.mark.parametrize('data_gen',
- non_nan_all_basic_gens + decimal_gens + _nested_gens,
+ all_basic_gens + decimal_gens + _nested_gens,
ids=idfn)
def test_collect_list_reductions(data_gen):
assert_gpu_and_cpu_are_equal_collect(
@@ -1205,7 +1219,9 @@ def test_sorted_groupby_first_last(data_gen):
lambda spark: agg_fn(gen_df(spark, gen_fn, num_slices=1)),
conf = {'spark.sql.shuffle.partitions': '1'})
-@ignore_order
+# Spark has a sorting bug with decimals, see https://issues.apache.org/jira/browse/SPARK-40129.
+# Have pytest do the sorting rather than Spark as a workaround.
+@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('count_func', [f.count, f.countDistinct])
def test_agg_count(data_gen, count_func):
@@ -1815,7 +1831,7 @@ def test_groupby_std_variance_partial_replace_fallback(data_gen,
# test min max on single level structure
#
gens_for_max_min = [byte_gen, short_gen, int_gen, long_gen,
- FloatGen(no_nans = True), DoubleGen(no_nans = True),
+ float_gen, double_gen,
string_gen, boolean_gen,
date_gen, timestamp_gen,
DecimalGen(precision=12, scale=2),
@@ -1852,4 +1868,4 @@ def test_min_max_for_single_level_struct(data_gen):
lambda spark : gen_df(spark, df_gen, length=1024),
"hash_agg_table",
'select min(a) from hash_agg_table',
- _no_nans_float_conf)
\ No newline at end of file
+ _no_nans_float_conf)
diff --git a/integration_tests/src/main/python/iceberg_test.py b/integration_tests/src/main/python/iceberg_test.py
index 76ee30fcbe8..4b4ccfacc10 100644
--- a/integration_tests/src/main/python/iceberg_test.py
+++ b/integration_tests/src/main/python/iceberg_test.py
@@ -35,6 +35,9 @@
ArrayGen(StructGen([['child0', string_gen], ['child1', double_gen], ['child2', int_gen]]))
] + iceberg_map_gens + decimal_gens ]
+rapids_reader_types = ['PERFILE', 'MULTITHREADED', 'COALESCING']
+
+
@allow_non_gpu("BatchScanExec")
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@@ -53,7 +56,7 @@ def setup_iceberg_table(spark):
@ignore_order(local=True)
@pytest.mark.skipif(is_before_spark_320() or is_databricks_runtime(),
reason="AQE+DPP not supported until Spark 3.2.0+ and AQE+DPP not supported on Databricks")
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_aqe_dpp(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -72,7 +75,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.parametrize("data_gens", iceberg_gens_list, ids=idfn)
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_parquet_read_round_trip(spark_tmp_table_factory, data_gens, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
table = spark_tmp_table_factory.get()
@@ -89,7 +92,7 @@ def setup_iceberg_table(spark):
@iceberg
@pytest.mark.parametrize("data_gens", [[long_gen]], ids=idfn)
@pytest.mark.parametrize("iceberg_format", ["orc", "avro"], ids=idfn)
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_unsupported_formats(spark_tmp_table_factory, data_gens, iceberg_format, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
table = spark_tmp_table_factory.get()
@@ -136,7 +139,7 @@ def setup_iceberg_table(spark):
marks=pytest.mark.skipif(is_before_spark_320(),
reason="Hadoop with Spark 3.1.x does not support lz4 by default")),
("zstd", None)], ids=idfn)
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_read_parquet_compression_codec(spark_tmp_table_factory, codec_info, reader_type):
codec, error_msg = codec_info
table = spark_tmp_table_factory.get()
@@ -160,7 +163,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.parametrize("key_gen", [int_gen, long_gen, string_gen, boolean_gen, date_gen, timestamp_gen, decimal_gen_64bit], ids=idfn)
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_read_partition_key(spark_tmp_table_factory, key_gen, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -176,7 +179,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_input_meta(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -194,7 +197,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_disorder_read_schema(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -274,7 +277,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.skipif(is_before_spark_320(), reason="Spark 3.1.x has a catalog bug precluding scope prefix in table names")
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_read_timetravel(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -298,7 +301,7 @@ def setup_snapshots(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.skipif(is_before_spark_320(), reason="Spark 3.1.x has a catalog bug precluding scope prefix in table names")
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_incremental_read(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -328,7 +331,7 @@ def setup_snapshots(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_reorder_columns(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -349,7 +352,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_rename_column(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -370,7 +373,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_column_names_swapped(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -393,7 +396,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_alter_column_type(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -416,7 +419,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_add_column(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -437,7 +440,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_remove_column(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -458,7 +461,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_add_partition_field(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -479,7 +482,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_drop_partition_field(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -500,7 +503,7 @@ def setup_iceberg_table(spark):
@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_v1_delete(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -517,7 +520,7 @@ def setup_iceberg_table(spark):
@iceberg
@pytest.mark.skipif(is_before_spark_320(), reason="merge-on-read not supported on Spark 3.1.x")
-@pytest.mark.parametrize('reader_type', ['PERFILE', 'MULTITHREADED'])
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_v2_delete_unsupported(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmpview = spark_tmp_table_factory.get()
@@ -534,3 +537,19 @@ def setup_iceberg_table(spark):
lambda spark : spark.sql("SELECT * FROM {}".format(table)).collect(),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}),
"UnsupportedOperationException: Delete filter is not supported")
+
+
+@iceberg
+@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
+@pytest.mark.parametrize('reader_type', rapids_reader_types)
+def test_iceberg_parquet_read_with_input_file(spark_tmp_table_factory, reader_type):
+ table = spark_tmp_table_factory.get()
+ tmpview = spark_tmp_table_factory.get()
+ def setup_iceberg_table(spark):
+ df = binary_op_df(spark, long_gen)
+ df.createOrReplaceTempView(tmpview)
+ spark.sql("CREATE TABLE {} USING ICEBERG AS SELECT * FROM {}".format(table, tmpview))
+ with_cpu_session(setup_iceberg_table)
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : spark.sql("SELECT *, input_file_name() FROM {}".format(table)),
+ conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})
diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py
index 50c1b1ca70e..b64eab99d41 100644
--- a/integration_tests/src/main/python/join_test.py
+++ b/integration_tests/src/main/python/join_test.py
@@ -22,15 +22,14 @@
from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan
from spark_session import with_cpu_session, with_spark_session
-# Mark all tests in current file as premerge_ci_1 in order to be run in first k8s pod for parallel build premerge job
-pytestmark = [pytest.mark.premerge_ci_1, pytest.mark.nightly_resource_consuming_test]
+pytestmark = [pytest.mark.nightly_resource_consuming_test]
all_join_types = ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter']
all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
BooleanGen(), DateGen(), TimestampGen(), null_gen,
pytest.param(FloatGen(), marks=[incompat]),
- pytest.param(DoubleGen(), marks=[incompat])] + decimal_gens
+ pytest.param(DoubleGen(), marks=[incompat])] + orderable_decimal_gens
all_gen_no_nulls = [StringGen(nullable=False), ByteGen(nullable=False),
ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False),
@@ -73,7 +72,8 @@
# Types to use when running joins on small batches. Small batch joins can take a long time
# to run and are mostly redundant with the normal batch size test, so we only run these on a
# set of representative types rather than all types.
-join_small_batch_gens = [ StringGen(), IntegerGen(), decimal_gen_128bit ]
+
+join_small_batch_gens = [ StringGen(), IntegerGen(), orderable_decimal_gen_128bit ]
cartesian_join_small_batch_gens = join_small_batch_gens + [basic_struct_gen, ArrayGen(string_gen)]
_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
diff --git a/integration_tests/src/main/python/orc_cast_test.py b/integration_tests/src/main/python/orc_cast_test.py
new file mode 100644
index 00000000000..6a84407a632
--- /dev/null
+++ b/integration_tests/src/main/python/orc_cast_test.py
@@ -0,0 +1,83 @@
+# Copyright (c) 2020-2022, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error
+from data_gen import *
+from pyspark.sql.types import *
+from spark_session import with_cpu_session
+from orc_test import reader_opt_confs
+
+
+def create_orc(data_gen_list, data_path):
+ # generate ORC dataframe, and dump it to local file 'data_path'
+ with_cpu_session(
+ lambda spark: gen_df(spark, data_gen_list).write.orc(data_path)
+ )
+
+
+@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
+@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
+@pytest.mark.parametrize('to_type', ['boolean', 'tinyint', 'smallint', 'int', 'bigint'])
+def test_casting_among_integer_types(spark_tmp_path, reader_confs, v1_enabled_list, to_type):
+ # cast integral types to another integral types
+ int_gens = [boolean_gen] + integral_gens
+ gen_list = [('c' + str(i), gen) for i, gen in enumerate(int_gens)]
+ data_path = spark_tmp_path + '/ORC_DATA'
+ create_orc(gen_list, data_path)
+
+ # generate schema string like "c0 to_type, c1 to_type, ..., c4 to_type"
+ schema_str = " {}, ".join([x[0] for x in gen_list]) + " {}"
+ schema_str = schema_str.format(*([to_type] * len(gen_list)))
+ all_confs = copy_and_update(reader_confs,
+ {'spark.sql.sources.useV1SourceList': v1_enabled_list})
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: spark.read.schema(schema_str).orc(data_path),
+ conf=all_confs)
+
+
+@pytest.mark.parametrize('to_type', ['float', 'double', 'string', 'timestamp'])
+def test_casting_from_integer(spark_tmp_path, to_type):
+ orc_path = spark_tmp_path + '/orc_cast_integer'
+ # The Python 'datetime' module only supports a max-year of 10000, so we set the Long type max
+ # to '1e11'. If the long-value is out of this range, pytest will throw an exception.
+ data_gen = [('boolean_col', boolean_gen), ('tinyint_col', byte_gen),
+ ('smallint_col', ShortGen(min_val=BYTE_MAX + 1)),
+ ('int_col', IntegerGen(min_val=SHORT_MAX + 1)),
+ ('bigint_col', LongGen(min_val=INT_MAX + 1, max_val=int(1e11))),
+ ('negint_col', IntegerGen(max_val=-1))]
+ create_orc(data_gen, orc_path)
+
+ schema_str = "boolean_col {}, tinyint_col {}, smallint_col {}, int_col {}, bigint_col {}, negint_col {}"
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: spark.read.schema(
+ schema_str.format(*([to_type] * len(data_gen)))).orc(orc_path)
+ )
+
+@pytest.mark.parametrize('overflow_long_gen', [LongGen(min_val=int(1e16)),
+ LongGen(max_val=int(-1e16))])
+@pytest.mark.parametrize('to_type', ['timestamp'])
+def test_casting_from_overflow_long(spark_tmp_path, overflow_long_gen,to_type):
+ # Timestamp(micro-seconds) is actually type of int64, when casting long(int64) to timestamp,
+ # we need to multiply 1e6 (or 1e3), and it may cause overflow. This function aims to test
+ # whether if 'ArithmeticException' is caught.
+ orc_path = spark_tmp_path + '/orc_cast_overflow_long'
+ create_orc([('long_column', overflow_long_gen)], orc_path)
+ schema_str = "long_column {}".format(to_type)
+ assert_gpu_and_cpu_error(
+ df_fun=lambda spark: spark.read.schema(schema_str).orc(orc_path).collect(),
+ conf={},
+ error_message="ArithmeticException"
+ )
diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py
index 204b4a127b9..7c0c775fc68 100644
--- a/integration_tests/src/main/python/orc_test.py
+++ b/integration_tests/src/main/python/orc_test.py
@@ -31,6 +31,14 @@ def read_orc_df(data_path):
def read_orc_sql(data_path):
return lambda spark : spark.sql('select * from orc.`{}`'.format(data_path))
+# ORC has issues reading timestamps where it is off by 1 second if the timestamp is before
+# epoch in 1970 and the microsecond value is between 0 and 1000.
+# See https://github.com/rapidsai/cudf/issues/11525.
+def get_orc_timestamp_gen(nullable=True):
+ return TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc), nullable=nullable)
+
+orc_timestamp_gen = get_orc_timestamp_gen()
+
# test with original orc file reader, the multi-file parallel reader for cloud
original_orc_file_reader_conf = {'spark.rapids.sql.format.orc.reader.type': 'PERFILE'}
multithreaded_orc_file_reader_conf = {'spark.rapids.sql.format.orc.reader.type': 'MULTITHREADED'}
@@ -51,7 +59,7 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl,
conf=all_confs)
# ORC does not support negative scale for decimal. So here is "decimal_gens_no_neg".
-# Otherwsie it will get the below exception.
+# Otherwise it will get the below exception.
# ...
#E Caused by: java.lang.IllegalArgumentException: Missing integer at
# 'struct<`_c0`:decimal(7,^-3),`_c1`:decimal(7,3),`_c2`:decimal(7,7),`_c3`:decimal(12,2)>'
@@ -60,8 +68,7 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl,
# ...
orc_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
- TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
- ] + decimal_gens
+ orc_timestamp_gen] + decimal_gens
orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)])
@@ -85,7 +92,7 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl,
orc_basic_map_gens = [simple_string_to_string_map_gen] + [MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen,
- lambda nullable=True: TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc), nullable=nullable),
+ lambda nullable=True: get_orc_timestamp_gen(nullable),
lambda nullable=True: DateGen(start=date(1590, 1, 1), nullable=nullable),
lambda nullable=True: DecimalGen(precision=15, scale=1, nullable=nullable),
lambda nullable=True: DecimalGen(precision=36, scale=5, nullable=nullable)]]
@@ -152,7 +159,7 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
DateGen(start=date(1590, 1, 1)),
# Once https://github.com/NVIDIA/spark-rapids/issues/140 is fixed replace this with
# timestamp_gen
- TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]
+ orc_timestamp_gen]
@pytest.mark.order(2)
@pytest.mark.parametrize('orc_gen', orc_pred_push_gens, ids=idfn)
@@ -220,7 +227,7 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs):
# we should go with a more standard set of generators
orc_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
- TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]
+ orc_timestamp_gen]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
first_data_path = spark_tmp_path + '/ORC_DATA/key=0/key2=20'
with_cpu_session(
@@ -287,7 +294,7 @@ def test_merge_schema_read(spark_tmp_path, v1_enabled_list, reader_confs):
# we should go with a more standard set of generators
orc_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
- TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]
+ orc_timestamp_gen]
first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
first_data_path = spark_tmp_path + '/ORC_DATA/key=0'
with_cpu_session(
@@ -664,26 +671,6 @@ def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, ag
conf=_orc_aggregate_pushdown_enabled_conf)
-@pytest.mark.parametrize('offset', [1,2,3,4], ids=idfn)
-@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
-@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
-def test_read_type_casting_integral(spark_tmp_path, offset, reader_confs, v1_enabled_list):
- int_gens = [boolean_gen] + integral_gens
- gen_list = [('c' + str(i), gen) for i, gen in enumerate(int_gens)]
- data_path = spark_tmp_path + '/ORC_DATA'
- with_cpu_session(
- lambda spark: gen_df(spark, gen_list).write.orc(data_path))
-
- # build the read schema by a left shift of int_gens
- shifted_int_gens = int_gens[offset:] + int_gens[:offset]
- rs_gen_list = [('c' + str(i), gen) for i, gen in enumerate(shifted_int_gens)]
- rs = StructGen(rs_gen_list, nullable=False).data_type
- all_confs = copy_and_update(reader_confs,
- {'spark.sql.sources.useV1SourceList': v1_enabled_list})
- assert_gpu_and_cpu_are_equal_collect(
- lambda spark: spark.read.schema(rs).orc(data_path),
- conf=all_confs)
-
def test_orc_read_count(spark_tmp_path):
data_path = spark_tmp_path + '/ORC_DATA'
orc_gens = [int_gen, string_gen, double_gen]
diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py
index 4b09db2740b..2f6de5b7f48 100644
--- a/integration_tests/src/main/python/sort_test.py
+++ b/integration_tests/src/main/python/sort_test.py
@@ -19,12 +19,19 @@
from marks import allow_non_gpu
from pyspark.sql.types import *
import pyspark.sql.functions as f
+from spark_session import is_before_spark_340
+
+# Many Spark versions have issues sorting decimals.
+# https://issues.apache.org/jira/browse/SPARK-40089
+_orderable_not_null_big_decimal_gen = DecimalGen(precision=20, scale=2, nullable=False)
+if is_before_spark_340():
+ _orderable_not_null_big_decimal_gen = DecimalGen(precision=20, scale=2, nullable=False, special_cases=[])
orderable_not_null_gen = [ByteGen(nullable=False), ShortGen(nullable=False), IntegerGen(nullable=False),
LongGen(nullable=False), FloatGen(nullable=False), DoubleGen(nullable=False), BooleanGen(nullable=False),
TimestampGen(nullable=False), DateGen(nullable=False), StringGen(nullable=False),
DecimalGen(precision=7, scale=3, nullable=False), DecimalGen(precision=12, scale=2, nullable=False),
- DecimalGen(precision=20, scale=2, nullable=False)]
+ _orderable_not_null_big_decimal_gen]
@allow_non_gpu('SortExec', 'ShuffleExchangeExec', 'RangePartitioning', 'SortOrder')
@pytest.mark.parametrize('data_gen', [StringGen(nullable=False)], ids=idfn)
@@ -164,7 +171,8 @@ def test_single_nested_sort_in_part(data_gen, order, stable_sort):
conf=sort_conf)
orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
- boolean_gen, timestamp_gen, date_gen, string_gen, null_gen, StructGen([('child0', long_gen)])] + decimal_gens
+ boolean_gen, timestamp_gen, date_gen, string_gen, null_gen, StructGen([('child0', long_gen)])
+ ] + orderable_decimal_gens
@pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn)
def test_multi_orderby(data_gen):
assert_gpu_and_cpu_are_equal_collect(
diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py
index e36cc3d282b..3ba6c390c0d 100644
--- a/integration_tests/src/main/python/spark_init_internal.py
+++ b/integration_tests/src/main/python/spark_init_internal.py
@@ -12,34 +12,89 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import os
+import re
+import sys
-try:
- import pyspark
-except ImportError as error:
- import findspark
- findspark.init()
- import pyspark
+logging.basicConfig(
+ format="%(asctime)s %(levelname)-8s %(message)s",
+ level=logging.INFO,
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
_CONF_ENV_PREFIX = 'PYSP_TEST_'
_EXECUTOR_ENV_PREFIX = 'spark_executorEnv_'
def env_for_conf(spark_conf_name):
- return _CONF_ENV_PREFIX + spark_conf_name.replace('.', '_')
+ # escape underscores
+ escaped_conf = spark_conf_name.replace('_', r'__')
+ return _CONF_ENV_PREFIX + escaped_conf.replace('.', '_')
def conf_for_env(env_name):
conf_key = env_name[len(_CONF_ENV_PREFIX):]
if conf_key.startswith(_EXECUTOR_ENV_PREFIX):
res = _EXECUTOR_ENV_PREFIX.replace('_', '.') + conf_key[len(_EXECUTOR_ENV_PREFIX):]
else:
- res = conf_key.replace('_', '.')
+ # replace standalone underscores
+ res1 = re.sub(r'(?&1 | tee testout; " \
+ "'LOCAL_JAR_PATH=%s SPARK_CONF=%s BASE_SPARK_VERSION=%s bash %s %s 2>&1 | tee testout; " \
"if [ ${PIPESTATUS[0]} -ne 0 ]; then false; else true; fi'" % \
(master_addr, params.private_key_file, params.jar_path, params.spark_conf, params.base_spark_pom_version,
params.script_dest, ' '.join(params.script_args))
diff --git a/jenkins/databricks/test.sh b/jenkins/databricks/test.sh
index 8cbbb4b526a..5cb660ad2a0 100755
--- a/jenkins/databricks/test.sh
+++ b/jenkins/databricks/test.sh
@@ -19,8 +19,11 @@ set -ex
LOCAL_JAR_PATH=${LOCAL_JAR_PATH:-''}
SPARK_CONF=${SPARK_CONF:-''}
-BASE_SPARK_VER=${BASE_SPARK_VER:-'3.1.2'}
-[[ -z $SPARK_SHIM_VER ]] && export SPARK_SHIM_VER=spark${BASE_SPARK_VER//.}db
+BASE_SPARK_VERSION=${BASE_SPARK_VERSION:-'3.1.2'}
+[[ -z $SPARK_SHIM_VER ]] && export SPARK_SHIM_VER=spark${BASE_SPARK_VERSION//.}db
+
+# install required packages
+sudo apt -y install zip unzip
# Try to use "cudf-udf" conda environment for the python cudf-udf tests.
if [ -d "/databricks/conda/envs/cudf-udf" ]; then
@@ -34,7 +37,7 @@ export SPARK_HOME=/databricks/spark
# change to not point at databricks confs so we don't conflict with their settings
export SPARK_CONF_DIR=$PWD
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark/:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip
-if [[ $BASE_SPARK_VER == "3.2.1" ]]
+if [[ $BASE_SPARK_VERSION == "3.2.1" ]]
then
# Databricks Koalas can conflict with the actual Pandas version, so put site packages first
export PYTHONPATH=/databricks/python3/lib/python3.8/site-packages:$PYTHONPATH
@@ -65,7 +68,7 @@ if [ -n "$SPARK_CONF" ]; then
fi
IS_SPARK_311_OR_LATER=0
-[[ "$(printf '%s\n' "3.1.1" "$BASE_SPARK_VER" | sort -V | head -n1)" = "3.1.1" ]] && IS_SPARK_311_OR_LATER=1
+[[ "$(printf '%s\n' "3.1.1" "$BASE_SPARK_VERSION" | sort -V | head -n1)" = "3.1.1" ]] && IS_SPARK_311_OR_LATER=1
# TEST_MODE
@@ -78,7 +81,7 @@ TEST_TYPE="nightly"
PCBS_CONF="com.nvidia.spark.ParquetCachedBatchSerializer"
ICEBERG_VERSION=${ICEBERG_VERSION:-0.13.2}
-ICEBERG_SPARK_VER=$(echo $BASE_SPARK_VER | cut -d. -f1,2)
+ICEBERG_SPARK_VER=$(echo $BASE_SPARK_VERSION | cut -d. -f1,2)
# Classloader config is here to work around classloader issues with
# --packages in distributed setups, should be fixed by
# https://github.com/NVIDIA/spark-rapids/pull/5646
diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh
index 256f5fdf7e4..228fb6f51fa 100755
--- a/jenkins/spark-premerge-build.sh
+++ b/jenkins/spark-premerge-build.sh
@@ -48,9 +48,7 @@ mvn_verify() {
$MVN_INSTALL_CMD -DskipTests -Dbuildver=313
[[ $BUILD_MAINTENANCE_VERSION_SNAPSHOTS == "true" ]] && $MVN_INSTALL_CMD -Dbuildver=314
- # don't skip tests
- env -u SPARK_HOME $MVN_CMD -U -B $MVN_URM_MIRROR -Dbuildver=320 clean install $MVN_BUILD_ARGS \
- -Dpytest.TEST_TAGS='' -pl '!tools'
+ $MVN_INSTALL_CMD -DskipTests -Dbuildver=320
# enable UTF-8 for regular expression tests
env -u SPARK_HOME LC_ALL="en_US.UTF-8" $MVN_CMD $MVN_URM_MIRROR -Dbuildver=320 test $MVN_BUILD_ARGS \
-Dpytest.TEST_TAGS='' -pl '!tools' \
@@ -130,15 +128,17 @@ ci_2() {
$MVN_CMD -U -B $MVN_URM_MIRROR clean package $MVN_BUILD_ARGS -DskipTests=true
export TEST_TAGS="not premerge_ci_1"
export TEST_TYPE="pre-commit"
- export TEST_PARALLEL=4
- # separate process to avoid OOM kill
- TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh
- TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh
- TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \
- ./integration_tests/run_pyspark_from_build.sh
+ export TEST_PARALLEL=5
+ ./integration_tests/run_pyspark_from_build.sh
+ # enable avro test separately
INCLUDE_SPARK_AVRO_JAR=true TEST='avro_test.py' ./integration_tests/run_pyspark_from_build.sh
# export 'LC_ALL' to set locale with UTF-8 so regular expressions are enabled
LC_ALL="en_US.UTF-8" TEST="regexp_test.py" ./integration_tests/run_pyspark_from_build.sh
+
+ # put some mvn tests here to balance durations of parallel stages
+ echo "Run mvn package..."
+ env -u SPARK_HOME $MVN_CMD -U -B $MVN_URM_MIRROR -Dbuildver=320 clean package $MVN_BUILD_ARGS \
+ -Dpytest.TEST_TAGS='' -pl '!tools'
}
@@ -183,7 +183,10 @@ export SPARK_HOME="$ARTF_ROOT/spark-$SPARK_VER-bin-hadoop3.2"
export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"
tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \
rm -f $SPARK_HOME.tgz
-export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark/:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip
+# copy python path libs to container /tmp instead of workspace to avoid ephemeral PVC issue
+TMP_PYTHON=/tmp/$(date +"%Y%m%d")
+rm -rf $TMP_PYTHON && cp -r $SPARK_HOME/python $TMP_PYTHON
+export PYTHONPATH=$TMP_PYTHON/python:$TMP_PYTHON/python/pyspark/:$TMP_PYTHON/python/lib/py4j-0.10.9-src.zip
case $BUILD_TYPE in
diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh
index 5ffeddc7619..ea480d93e47 100755
--- a/jenkins/spark-tests.sh
+++ b/jenkins/spark-tests.sh
@@ -110,7 +110,11 @@ export SPARK_HOME="$ARTF_ROOT/spark-$SPARK_VER-bin-hadoop3.2"
export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"
tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \
rm -f $SPARK_HOME.tgz
-export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark/:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip
+# copy python path libs to container /tmp instead of workspace to avoid ephemeral PVC issue
+TMP_PYTHON=/tmp/$(date +"%Y%m%d")
+rm -rf $TMP_PYTHON && cp -r $SPARK_HOME/python $TMP_PYTHON
+export PYTHONPATH=$TMP_PYTHON/python:$TMP_PYTHON/python/pyspark/:$TMP_PYTHON/python/lib/py4j-0.10.9-src.zip
+
# Extract 'value' from conda config string 'key: value'
CONDA_ROOT=`conda config --show root_prefix | cut -d ' ' -f2`
PYTHON_VER=`conda config --show default_python | cut -d ' ' -f2`
@@ -195,9 +199,6 @@ run_iceberg_tests() {
# Iceberg does not support Spark 3.3+ yet
if [[ "$ICEBERG_SPARK_VER" < "3.3" ]]; then
- # Classloader config is here to work around classloader issues with
- # --packages in distributed setups, should be fixed by
- # https://github.com/NVIDIA/spark-rapids/pull/5646
SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $SEQ_CONF \
--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPARK_VER}_2.12:${ICEBERG_VERSION} \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/OrcCastingShims.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/OrcCastingShims.scala
new file mode 100644
index 00000000000..fe914811df2
--- /dev/null
+++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/OrcCastingShims.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2022, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.shims
+
+import ai.rapids.cudf.{ColumnView, DType, Scalar}
+import com.nvidia.spark.rapids.GpuOrcScan.{testLongMultiplicationOverflow, withResource}
+
+object OrcCastingShims {
+ /**
+ * Cast ColumnView of integer types to timestamp (in milliseconds).
+ * @param col The column view of integer types.
+ * @param fromType BOOL8, INT8/16/32/64
+ * @return A new timestamp columnar vector.
+ */
+ def castIntegerToTimestamp(col: ColumnView, fromType: DType): ColumnView = {
+ fromType match {
+ case DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32 =>
+ // From spark311 until spark314 (not include it), spark consider the integers as
+ // milli-seconds.
+ // cuDF requires casting to Long first, then we can cast Long to Timestamp(in microseconds)
+ // In CPU code of ORC casting, its conversion is 'integer -> milliseconds -> microseconds'
+ withResource(col.castTo(DType.INT64)) { longs =>
+ withResource(Scalar.fromLong(1000L)) { thousand =>
+ withResource(longs.mul(thousand)) { milliSeconds =>
+ milliSeconds.castTo(DType.TIMESTAMP_MICROSECONDS)
+ }
+ }
+ }
+ case DType.INT64 =>
+ // We need overflow checking here, since max value of INT64 is about 9 * 1e18, and convert
+ // INT64 to milliseconds(also a INT64 actually), we need multiply 1000, it may cause long
+ // integer-overflow.
+ // If these two 'testLongMultiplicationOverflow' throw no exception, it means no
+ // Long-overflow when casting 'col' to TIMESTAMP_MICROSECONDS.
+ if (col.max() != null) {
+ testLongMultiplicationOverflow(col.max().getLong, 1000L)
+ }
+ if (col.min() != null) {
+ testLongMultiplicationOverflow(col.min().getLong, 1000L)
+ }
+ withResource(Scalar.fromLong(1000L)) { thousand =>
+ withResource(col.mul(thousand)) { milliSeconds =>
+ milliSeconds.castTo(DType.TIMESTAMP_MICROSECONDS)
+ }
+ }
+ }
+ }
+}
diff --git a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala
index 4496ea9c93c..648a498893e 100644
--- a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala
+++ b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/Spark31XShims.scala
@@ -262,26 +262,7 @@ abstract class Spark31XShims extends SparkShims with Spark31Xuntil33XShims with
TypeSig.cpuNumeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagAggForGpu(): Unit = {
- // For Decimal Average the SUM adds a precision of 10 to avoid overflowing
- // then it divides by the count with an output scale that is 4 more than the input
- // scale. With how our divide works to match Spark, this means that we will need a
- // precision of 5 more. So 38 - 10 - 5 = 23
- val dataType = a.child.dataType
- dataType match {
- case dt: DecimalType =>
- if (dt.precision > 23) {
- if (conf.needDecimalGuarantees) {
- willNotWorkOnGpu("GpuAverage cannot guarantee proper overflow checks for " +
- s"a precision large than 23. The current precision is ${dt.precision}")
- } else {
- logWarning("Decimal overflow guarantees disabled for " +
- s"Average(${a.child.dataType}) produces ${dt} with an " +
- s"intermediate precision of ${dt.precision + 15}")
- }
- }
- case _ => // NOOP
- }
- GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
+ GpuOverrides.checkAndTagFloatAgg(a.child.dataType, conf, this)
}
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala
index f09873251ca..b1328b2502b 100644
--- a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala
+++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala
@@ -133,26 +133,7 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
TypeSig.cpuNumeric))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagAggForGpu(): Unit = {
- // For Decimal Average the SUM adds a precision of 10 to avoid overflowing
- // then it divides by the count with an output scale that is 4 more than the input
- // scale. With how our divide works to match Spark, this means that we will need a
- // precision of 5 more. So 38 - 10 - 5 = 23
- val dataType = a.child.dataType
- dataType match {
- case dt: DecimalType =>
- if (dt.precision > 23) {
- if (conf.needDecimalGuarantees) {
- willNotWorkOnGpu("GpuAverage cannot guarantee proper overflow checks for " +
- s"a precision large than 23. The current precision is ${dt.precision}")
- } else {
- logWarning("Decimal overflow guarantees disabled for " +
- s"Average(${a.child.dataType}) produces ${dt} with an " +
- s"intermediate precision of ${dt.precision + 15}")
- }
- }
- case _ => // NOOP
- }
- GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
+ GpuOverrides.checkAndTagFloatAgg(a.child.dataType, conf, this)
}
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/OrcCastingShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/OrcCastingShims.scala
new file mode 100644
index 00000000000..b793a683e04
--- /dev/null
+++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/OrcCastingShims.scala
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2022, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.shims
+
+import ai.rapids.cudf.{ColumnView, DType, Scalar}
+import com.nvidia.spark.rapids.GpuOrcScan.{testLongMultiplicationOverflow, withResource}
+
+object OrcCastingShims {
+ /**
+ * Cast ColumnView of integer types to timestamp (in milliseconds).
+ * @param col The column view of integer types.
+ * @param fromType BOOL8, INT8/16/32/64
+ * @return A new timestamp columnar vector.
+ */
+ def castIntegerToTimestamp(col: ColumnView, fromType: DType): ColumnView = {
+ fromType match {
+ case DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32 =>
+ // From spark320, spark consider the integers as seconds.
+ withResource(col.castTo(DType.INT64)) { longs =>
+ // In CPU, ORC assumes the integer value is in seconds, and returns timestamp in
+ // micro seconds, so we need to multiply 1e6 here.
+ withResource(Scalar.fromLong(1000000L)) { value =>
+ withResource(longs.mul(value)) { microSeconds =>
+ microSeconds.castTo(DType.TIMESTAMP_MICROSECONDS)
+ }
+ }
+ }
+
+ case DType.INT64 =>
+ // In CPU code of ORC casting, its conversion is 'integer -> milliseconds -> microseconds'
+ withResource(Scalar.fromLong(1000L)) { thousand =>
+ withResource(col.mul(thousand)) { milliSeconds =>
+ // We need to check long-overflow here. If milliseconds can not convert to
+ // micorseconds, then testLongMultiplicationOverflow will throw exception.
+ if (milliSeconds.max() != null) {
+ testLongMultiplicationOverflow(milliSeconds.max().getLong, 1000L)
+ }
+ if (milliSeconds.min() != null) {
+ testLongMultiplicationOverflow(milliSeconds.min().getLong, 1000L)
+ }
+ withResource(milliSeconds.mul(thousand)) { microSeconds =>
+ microSeconds.castTo(DType.TIMESTAMP_MICROSECONDS)
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala
index ea175b2e6d8..90137bdacf6 100644
--- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala
+++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala
@@ -141,26 +141,7 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
TypeSig.numericAndInterval + TypeSig.NULL))),
(a, conf, p, r) => new AggExprMeta[Average](a, conf, p, r) {
override def tagAggForGpu(): Unit = {
- // For Decimal Average the SUM adds a precision of 10 to avoid overflowing
- // then it divides by the count with an output scale that is 4 more than the input
- // scale. With how our divide works to match Spark, this means that we will need a
- // precision of 5 more. So 38 - 10 - 5 = 23
- val dataType = a.child.dataType
- dataType match {
- case dt: DecimalType =>
- if (dt.precision > 23) {
- if (conf.needDecimalGuarantees) {
- willNotWorkOnGpu("GpuAverage cannot guarantee proper overflow checks for " +
- s"a precision large than 23. The current precision is ${dt.precision}")
- } else {
- logWarning("Decimal overflow guarantees disabled for " +
- s"Average(${a.child.dataType}) produces $dt with an " +
- s"intermediate precision of ${dt.precision + 15}")
- }
- }
- case _ => // NOOP
- }
- GpuOverrides.checkAndTagFloatAgg(dataType, conf, this)
+ GpuOverrides.checkAndTagFloatAgg(a.child.dataType, conf, this)
}
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProvider.scala b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProvider.scala
index cd9cc9666c0..b17a70d303f 100644
--- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProvider.scala
+++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProvider.scala
@@ -25,6 +25,8 @@ trait IcebergProvider {
def isSupportedScan(scan: Scan): Boolean
def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]]
+
+ def copyScanWithInputFileTrue(scan: Scan): Scan
}
object IcebergProvider {
diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProviderImpl.scala b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProviderImpl.scala
index 9d440885e91..ae855214f2e 100644
--- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProviderImpl.scala
+++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProviderImpl.scala
@@ -66,4 +66,11 @@ class IcebergProviderImpl extends IcebergProvider {
ClassTag(cpuIcebergScanClass))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap
}
+
+ override def copyScanWithInputFileTrue(scan: Scan): Scan = scan match {
+ case icebergBatchScan: GpuSparkBatchQueryScan =>
+ icebergBatchScan.copyWithInputFileTrue();
+ case _ =>
+ throw new RuntimeException(s"Unsupported scan type: ${scan.getClass.getSimpleName}")
+ }
}
diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java
index 9b958cbaa8b..8755fa27289 100644
--- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java
+++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java
@@ -38,10 +38,12 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.rapids.InputFileUtils;
+import org.apache.spark.sql.rapids.execution.TrampolineUtil;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.Tuple2;
@@ -49,15 +51,12 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
-/** The wrapper of the GPU multi-threaded and coalescing(TBD) reader for Iceberg */
+/** The wrapper of the GPU multi-threaded and coalescing reader for Iceberg */
class GpuMultiFileBatchReader extends BaseDataReader {
private static final Logger LOG = LoggerFactory.getLogger(GpuMultiFileBatchReader.class);
- private final Map, Schema>> constsSchemaMap =
- Maps.newConcurrentMap();
private final LinkedHashMap files;
private final Schema expectedSchema;
private final boolean caseSensitive;
@@ -75,7 +74,7 @@ class GpuMultiFileBatchReader extends BaseDataReader {
private boolean needNext = true;
private boolean isBatchPending;
// lazy variables
- private FilePartitionReaderBase rapidsReader = null;
+ private CloseableIterator batchReader = null;
GpuMultiFileBatchReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive, Configuration conf, int maxBatchSizeRows, long maxBatchSizeBytes,
@@ -104,38 +103,28 @@ class GpuMultiFileBatchReader extends BaseDataReader {
@Override
public ColumnarBatch get() {
- if (rapidsReader == null) {
+ if (batchReader == null) {
// Not initialized, return null to align with PerFile reader.
return null;
}
needNext = true;
isBatchPending = false;
- // The same post-process with PerFile reader.
- try (ColumnarBatch batch = rapidsReader.get()) {
- // The Rapids reader should already set the current file.
- String curFile = InputFileUtils.getCurInputFilePath();
- Tuple2