Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GPU ORC statistics write support #5715

Merged
merged 3 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,7 @@ file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.

**Limitations With RAPIDS**

RAPIDS does not support whole file statistics in ORC file. We are working with
[CUDF](https://github.com/rapidsai/cudf) to support writing statistics and you can track it
[here](https://github.com/rapidsai/cudf/issues/5826).
RAPIDS does not support whole file statistics in ORC file in releases _prior_ to release 22.06.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
RAPIDS does not support whole file statistics in ORC file in releases _prior_ to release 22.06.
RAPIDS does not support whole file statistics in ORC file in releases prior to release 22.06.


*Writing ORC Files*

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
32 changes: 11 additions & 21 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,16 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li
_aggregate_orc_list_col_partition = ['COUNT']
_aggregate_orc_list_no_col_partition = ['MAX', 'MIN']
_aggregate_orc_list = _aggregate_orc_list_col_partition + _aggregate_orc_list_no_col_partition
_orc_aggregate_pushdown_enabled_conf = {'spark.sql.orc.aggregatePushdown': 'true',
_orc_aggregate_pushdown_enabled_conf = {'spark.rapids.sql.format.orc.write.enabled': 'true',
'spark.sql.orc.aggregatePushdown': 'true',
"spark.sql.sources.useV1SourceList": ""}

def _do_orc_scan_with_agg(spark, path, agg):
spark.range(10).selectExpr("id", "id % 3 as p").write.mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

def _do_orc_scan_with_agg_on_partitioned_column(spark, path, agg):
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
Expand All @@ -582,11 +588,7 @@ def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, aggregate):
| MAX | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_00.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.orc(data_path))


# fallback to CPU
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
Expand All @@ -609,16 +611,10 @@ def test_orc_scan_with_aggregate_pushdown_on_col_partition(spark_tmp_path, aggre
| COUNT | Y | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_01.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))


# fallback to CPU only if aggregate is COUNT
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=_orc_aggregate_pushdown_enabled_conf)
Expand All @@ -637,14 +633,8 @@ def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, ag
| MAX | Y | N |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_02.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))

# should not fallback to CPU
assert_gpu_and_cpu_are_equal_collect(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
conf=_orc_aggregate_pushdown_enabled_conf)