From 34c9aea4e374b9a701d147ada2f62c26274f1c1c Mon Sep 17 00:00:00 2001
From: Ahmed Hussein <50450311+amahussein@users.noreply.github.com>
Date: Thu, 2 Jun 2022 11:38:15 -0500
Subject: [PATCH] Update GPU ORC statistics write support (#5715)
* Update GPU ORC statistics write support
Signed-off-by: Ahmed Hussein (amahussein)
* clarify doc that ORC files write prior to 22.06
Signed-off-by: Ahmed Hussein (amahussein)
* nit removing emphasized prior
Signed-off-by: Ahmed Hussein (amahussein)
---
docs/compatibility.md | 14 ++++----
integration_tests/src/main/python/orc_test.py | 32 +++++++------------
2 files changed, 17 insertions(+), 29 deletions(-)
diff --git a/docs/compatibility.md b/docs/compatibility.md
index 95ada412650..ca59894767b 100644
--- a/docs/compatibility.md
+++ b/docs/compatibility.md
@@ -384,7 +384,7 @@ The plugin supports reading `uncompressed`, `snappy` and `zlib` ORC files and wr
and `snappy` ORC files. At this point, the plugin does not have the ability to fall back to the
CPU when reading an unsupported compression format, and will error out in that case.
-### Push Down Aggreates for ORC
+### Push Down Aggregates for ORC
Spark-3.3.0+ pushes down certain aggregations (`MIN`/`MAX`/`COUNT`) into ORC when the user-config
`spark.sql.orc.aggregatePushdown` is set to true.
@@ -414,16 +414,14 @@ file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.
**Limitations With RAPIDS**
-RAPIDS does not support whole file statistics in ORC file. We are working with
-[CUDF](https://github.com/rapidsai/cudf) to support writing statistics and you can track it
-[here](https://github.com/rapidsai/cudf/issues/5826).
+RAPIDS does not support whole file statistics in ORC file in releases prior to release 22.06.
*Writing ORC Files*
-Without CUDF support to file statistics, all ORC files written by
-the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
-In order to prevent job failures, `spark.sql.orc.aggregatePushdown` should be disabled while reading ORC files
-that were written by the GPU.
+If you are using release prior to release 22.06 where CUDF does not support writing file statistics, then the ORC files
+written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
+In order to prevent job failures in releases prior to release 22.06, `spark.sql.orc.aggregatePushdown` should be disabled
+while reading ORC files that were written by the GPU.
*Reading ORC Files*
diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py
index 8ca21623610..67b1408de09 100644
--- a/integration_tests/src/main/python/orc_test.py
+++ b/integration_tests/src/main/python/orc_test.py
@@ -561,10 +561,16 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li
_aggregate_orc_list_col_partition = ['COUNT']
_aggregate_orc_list_no_col_partition = ['MAX', 'MIN']
_aggregate_orc_list = _aggregate_orc_list_col_partition + _aggregate_orc_list_no_col_partition
-_orc_aggregate_pushdown_enabled_conf = {'spark.sql.orc.aggregatePushdown': 'true',
+_orc_aggregate_pushdown_enabled_conf = {'spark.rapids.sql.format.orc.write.enabled': 'true',
+ 'spark.sql.orc.aggregatePushdown': 'true',
"spark.sql.sources.useV1SourceList": ""}
def _do_orc_scan_with_agg(spark, path, agg):
+ spark.range(10).selectExpr("id", "id % 3 as p").write.mode("overwrite").orc(path)
+ return spark.read.orc(path).selectExpr('{}(p)'.format(agg))
+
+def _do_orc_scan_with_agg_on_partitioned_column(spark, path, agg):
+ spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))
@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
@@ -582,11 +588,7 @@ def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, aggregate):
| MAX | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_00.orc'
- # GPU ORC write with statistics is not correctly working.
- # Create ORC file in CPU session as a workaround
- with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
- .orc(data_path))
-
+
# fallback to CPU
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
@@ -609,16 +611,10 @@ def test_orc_scan_with_aggregate_pushdown_on_col_partition(spark_tmp_path, aggre
| COUNT | Y | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_01.orc'
- # GPU ORC write with statistics is not correctly working.
- # Create ORC file in CPU session as a workaround
- # Partition column P
- with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
- .partitionBy("p")
- .orc(data_path))
-
+
# fallback to CPU only if aggregate is COUNT
assert_cpu_and_gpu_are_equal_collect_with_capture(
- lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
+ lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=_orc_aggregate_pushdown_enabled_conf)
@@ -637,14 +633,8 @@ def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, ag
| MAX | Y | N |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_02.orc'
- # GPU ORC write with statistics is not correctly working.
- # Create ORC file in CPU session as a workaround
- # Partition column P
- with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
- .partitionBy("p")
- .orc(data_path))
# should not fallback to CPU
assert_gpu_and_cpu_are_equal_collect(
- lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
+ lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
conf=_orc_aggregate_pushdown_enabled_conf)
\ No newline at end of file