Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC reader supports struct #2887

Merged
merged 17 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -20918,9 +20918,9 @@ dates or timestamps, or for a lack of type coercion support.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, BINARY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, MAP, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
31 changes: 26 additions & 5 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,28 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]

orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)])

# Some array gens, but not all because of nesting
orc_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)]
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

# Some struct gens, but not all because of nesting.
# No empty struct gen because it leads to an error as below.
# '''
# E pyspark.sql.utils.AnalysisException:
# E Datasource does not support writing empty or nested empty schemas.
# E Please make sure the data schema has at least one or more column(s).
# '''
orc_struct_gens_sample = [orc_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

orc_gens_list = [orc_basic_gens,
orc_array_gens_sample,
orc_struct_gens_sample,
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))]

Expand Down Expand Up @@ -89,7 +104,9 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.orc(data_path))
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
# Nested schema pruning is not supported yet for orc read.
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"})
assert_gpu_and_cpu_are_equal_collect(
read_func(data_path),
conf=all_confs)
Expand All @@ -110,15 +127,19 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/ORC_DATA'
gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen)]
# Append two struct columns to verify nested predicate pushdown.
gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen),
('s1', StructGen([['sa', orc_gen]])),
('s2', StructGen([['sa', StructGen([['ssa', orc_gen]])]]))]
s0 = gen_scalar(orc_gen, force_no_nulls=True)
with_cpu_session(
lambda spark : gen_df(spark, gen_list).orderBy('a').write.orc(data_path))
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"})
rf = read_func(data_path)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: rf(spark).select(f.col('a') >= s0),
lambda spark: rf(spark).select(f.col('a') >= s0, f.col('s1.sa') >= s0, f.col('s2.sa.ssa') >= s0),
conf=all_confs)

orc_compress_options = ['none', 'uncompressed', 'snappy', 'zlib']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.OrcFilters
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -124,6 +124,19 @@ object GpuOrcScanBase {
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}

if (sparkSession.conf
.getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean) &&
schema.exists(f => hasStructType(f.dataType))) {
meta.willNotWorkOnGpu("nested schema pruning is not supported yet")
}
}

private def hasStructType(dt: DataType): Boolean = dt match {
case m: MapType => hasStructType(m.keyType) || hasStructType(m.valueType)
case a: ArrayType => hasStructType(a.elementType)
case _: StructType => true
case _ => false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ object GpuOverrides {
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
(OrcFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY).nested(),
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
cudfWrite = TypeSig.commonCudfTypes,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))
Expand Down
2 changes: 1 addition & 1 deletion tools/src/main/resources/supportedDataSource.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT
CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,CO,NA,NS,NA,NA,NA,NA,NA
ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,NS,NS,NS
ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,NS,PS*,NS
Parquet,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,PS*,PS*,NS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types
Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:struct:decimal]
Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:decimal]