Skip to content

Commit

Permalink
Update GPU ORC statistics write support (#5715)
Browse files Browse the repository at this point in the history
* Update GPU ORC statistics write support

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

* clarify doc that ORC files write prior to 22.06

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

* nit removing emphasized prior

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein authored Jun 2, 2022
1 parent 230ac0a commit 34c9aea
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 29 deletions.
14 changes: 6 additions & 8 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ The plugin supports reading `uncompressed`, `snappy` and `zlib` ORC files and wr
and `snappy` ORC files. At this point, the plugin does not have the ability to fall back to the
CPU when reading an unsupported compression format, and will error out in that case.

### Push Down Aggreates for ORC
### Push Down Aggregates for ORC

Spark-3.3.0+ pushes down certain aggregations (`MIN`/`MAX`/`COUNT`) into ORC when the user-config
`spark.sql.orc.aggregatePushdown` is set to true.
Expand Down Expand Up @@ -414,16 +414,14 @@ file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.

**Limitations With RAPIDS**

RAPIDS does not support whole file statistics in ORC file. We are working with
[CUDF](https://github.com/rapidsai/cudf) to support writing statistics and you can track it
[here](https://github.com/rapidsai/cudf/issues/5826).
RAPIDS does not support whole file statistics in ORC file in releases prior to release 22.06.

*Writing ORC Files*

Without CUDF support to file statistics, all ORC files written by
the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
In order to prevent job failures, `spark.sql.orc.aggregatePushdown` should be disabled while reading ORC files
that were written by the GPU.
If you are using release prior to release 22.06 where CUDF does not support writing file statistics, then the ORC files
written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
In order to prevent job failures in releases prior to release 22.06, `spark.sql.orc.aggregatePushdown` should be disabled
while reading ORC files that were written by the GPU.

*Reading ORC Files*

Expand Down
32 changes: 11 additions & 21 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,16 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li
_aggregate_orc_list_col_partition = ['COUNT']
_aggregate_orc_list_no_col_partition = ['MAX', 'MIN']
_aggregate_orc_list = _aggregate_orc_list_col_partition + _aggregate_orc_list_no_col_partition
_orc_aggregate_pushdown_enabled_conf = {'spark.sql.orc.aggregatePushdown': 'true',
_orc_aggregate_pushdown_enabled_conf = {'spark.rapids.sql.format.orc.write.enabled': 'true',
'spark.sql.orc.aggregatePushdown': 'true',
"spark.sql.sources.useV1SourceList": ""}

def _do_orc_scan_with_agg(spark, path, agg):
spark.range(10).selectExpr("id", "id % 3 as p").write.mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

def _do_orc_scan_with_agg_on_partitioned_column(spark, path, agg):
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
Expand All @@ -582,11 +588,7 @@ def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, aggregate):
| MAX | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_00.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.orc(data_path))


# fallback to CPU
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
Expand All @@ -609,16 +611,10 @@ def test_orc_scan_with_aggregate_pushdown_on_col_partition(spark_tmp_path, aggre
| COUNT | Y | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_01.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))


# fallback to CPU only if aggregate is COUNT
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=_orc_aggregate_pushdown_enabled_conf)
Expand All @@ -637,14 +633,8 @@ def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, ag
| MAX | Y | N |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_02.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))

# should not fallback to CPU
assert_gpu_and_cpu_are_equal_collect(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
conf=_orc_aggregate_pushdown_enabled_conf)

0 comments on commit 34c9aea

Please sign in to comment.