From 5f0ceb57d8f832491b9df3c6a677e62ca220ad85 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 8 Jul 2021 17:38:55 +0800 Subject: [PATCH 01/14] orc reader supports struct and list Signed-off-by: Firestarman --- docs/supported_ops.md | 4 ++-- integration_tests/src/main/python/orc_test.py | 21 +++++++++++++++++-- .../nvidia/spark/rapids/GpuOverrides.scala | 3 ++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 001fe0c2e98..1ede3590f1f 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -20918,9 +20918,9 @@ dates or timestamps, or for a lack of type coercion support. NS +PS* (missing nested DECIMAL, BINARY, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, BINARY, MAP, UDT) NS diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 43711c55bb1..c959942dacd 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -45,9 +45,26 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl, read_func(std_input_path + '/' + name), conf=all_confs) -orc_gens_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, +orc_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))], + TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))] + +orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)]) + +# Some array gens, but not all because of nesting +orc_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_basic_gens] + [ + ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), + ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10), + ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] + +# Some struct gens, but not all because of nesting +orc_struct_gens_sample = [orc_basic_struct_gen, + StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]]), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] + +orc_gens_list = [orc_basic_gens, + orc_array_gens_sample, + orc_struct_gens_sample, pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')), pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 8a3a1fbeb2c..abe3d56074c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -769,7 +769,8 @@ object GpuOverrides { sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested())), (OrcFormatType, FileFormatChecks( - cudfReadWrite = TypeSig.commonCudfTypes, + cudfRead = (TypeSig.commonCudfTypes + TypeSig.STRUCT + TypeSig.ARRAY).nested(), + cudfWrite = TypeSig.commonCudfTypes, sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested()))) From 5b32a8db3e6be34faf2b53379c31fe2cc599bd0e Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 9 Jul 2021 15:40:47 +0800 Subject: [PATCH 02/14] Update the tests Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index c959942dacd..e27c4efe301 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -49,18 +49,22 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))] -orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)]) +# Set to true after the issue https://github.com/rapidsai/cudf/issues/8704 is fixed. +struct_gen_nullable = False + +orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)], + struct_gen_nullable) # Some array gens, but not all because of nesting orc_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_basic_gens] + [ - ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), - ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10), - ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] + ArrayGen(ArrayGen(short_gen, max_length=10)), + ArrayGen(ArrayGen(string_gen, max_length=10)), + ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]], struct_gen_nullable))] # Some struct gens, but not all because of nesting orc_struct_gens_sample = [orc_basic_struct_gen, - StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]]), - StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] + StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]], struct_gen_nullable), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]], struct_gen_nullable)] orc_gens_list = [orc_basic_gens, orc_array_gens_sample, From a2749c60cd80dc497daa463db9f590aa133514b7 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 14 Jul 2021 12:00:35 +0800 Subject: [PATCH 03/14] Address comments Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index e27c4efe301..bcf9cc2feb5 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -49,22 +49,18 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))] -# Set to true after the issue https://github.com/rapidsai/cudf/issues/8704 is fixed. -struct_gen_nullable = False - -orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)], - struct_gen_nullable) +orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)]) # Some array gens, but not all because of nesting orc_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_basic_gens] + [ ArrayGen(ArrayGen(short_gen, max_length=10)), ArrayGen(ArrayGen(string_gen, max_length=10)), - ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]], struct_gen_nullable))] + ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] # Some struct gens, but not all because of nesting orc_struct_gens_sample = [orc_basic_struct_gen, - StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]], struct_gen_nullable), - StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]], struct_gen_nullable)] + StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]]), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] orc_gens_list = [orc_basic_gens, orc_array_gens_sample, From 9f46068c22c49313edc0e7adc02cede0bfce2e20 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 16 Jul 2021 14:45:18 +0800 Subject: [PATCH 04/14] support only struct Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 4 ++-- .../src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 2 +- tools/src/main/resources/supportedDataSource.csv | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index d84977b6986..c91704f90ed 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -54,8 +54,8 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl, # Some array gens, but not all because of nesting orc_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_basic_gens] + [ - ArrayGen(ArrayGen(short_gen, max_length=10)), - ArrayGen(ArrayGen(string_gen, max_length=10)), + ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), + ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10), ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] # Some struct gens, but not all because of nesting diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index e273f1c5279..db50d260e20 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -768,7 +768,7 @@ object GpuOverrides { sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested())), (OrcFormatType, FileFormatChecks( - cudfRead = (TypeSig.commonCudfTypes + TypeSig.STRUCT + TypeSig.ARRAY).nested(), + cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), cudfWrite = TypeSig.commonCudfTypes, sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested()))) diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index cd327bb6c4b..7c75d430f70 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -1,4 +1,4 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,CO,NA,NS,NA,NA,NA,NA,NA -ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,NS,NS,NS,NS +ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,NS,PS*,NS Parquet,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,PS*,PS*,NS From 3e1d6b6161544df059b18ce4b9fcdbfe82a5679e Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 21 Jul 2021 10:58:06 +0800 Subject: [PATCH 05/14] tool module update Signed-off-by: Firestarman --- .../QualificationExpectations/complex_dec_expectation.csv | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv b/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv index 4a59efccf04..dc35a000774 100644 --- a/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types -Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:struct:decimal] +Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:decimal] From cecadba2e2a7901460d4df3e815a5b506496bf50 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 22 Jul 2021 16:53:02 +0800 Subject: [PATCH 06/14] Add tests for nested column pruning Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 19 ++++++++++++ .../com/nvidia/spark/rapids/GpuOrcScan.scala | 30 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index c91704f90ed..f3317e696ac 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -20,6 +20,7 @@ from marks import * from pyspark.sql.types import * from spark_session import with_cpu_session, with_spark_session +from parquet_test import _nested_pruning_schemas def read_orc_df(data_path): return lambda spark : spark.read.orc(data_path) @@ -281,3 +282,21 @@ def test_missing_column_names_filter(spark_tmp_table_factory, reader_confs): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.sql("SELECT _col3,_col2 FROM {} WHERE _col2 = '155'".format(table_name)), all_confs) + + +@pytest.mark.parametrize('data_gen,read_schema', _nested_pruning_schemas, ids=idfn) +@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) +@pytest.mark.parametrize('v1_enabled_list', ["", "orc"]) +@pytest.mark.parametrize('nested_enabled', ["true", "false"]) +def test_read_nested_pruning(spark_tmp_path, data_gen, read_schema, reader_confs, v1_enabled_list, nested_enabled): + data_path = spark_tmp_path + '/ORC_DATA' + with_cpu_session( + lambda spark : gen_df(spark, data_gen).write.orc(data_path)) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.optimizer.nestedSchemaPruning.enabled': nested_enabled}) + # This is a hack to get the type in a slightly less verbose way + rs = StructGen(read_schema, nullable=False).data_type + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.schema(rs).orc(data_path), + conf=all_confs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index adf59243686..d94dfa35d1f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -1087,7 +1087,11 @@ private case class GpuOrcFileFilterHandler( val newReadSchema = TypeDescription.createStruct() readerFieldNames.zip(readerChildren).foreach { case (readField, readType) => val (fileType, fileFieldName) = fileTypesMap.getOrElse(readField, (null, null)) - if (readType != fileType) { + // When column pruning is enabled, the readType is not always equal to the fileType, + // may be part of the fileType. e.g. + // read type: struct + // file type: struct + if (!isSchemaCompatible(fileType, readType)) { throw new QueryExecutionException("Incompatible schemas for ORC file" + s" at ${partFile.filePath}\n" + s" file schema: $fileSchema\n" + @@ -1099,6 +1103,30 @@ private case class GpuOrcFileFilterHandler( newReadSchema } + /** + * The read schema is compatible with the file schema only when + * 1) They are equal to each other + * 2) The read schema is part of the file schema for struct types. + * + * @param fileSchema input file's ORC schema + * @param readSchema ORC schema for what will be read + * @return true if they are compatible, otherwise false + */ + private def isSchemaCompatible( + fileSchema: TypeDescription, + readSchema: TypeDescription): Boolean = { + fileSchema == readSchema || + fileSchema != null && readSchema != null && + fileSchema.getCategory == readSchema.getCategory && { + if (readSchema.getChildren != null) { + readSchema.getChildren.asScala.forall(rc => + fileSchema.getChildren.asScala.exists(fc => isSchemaCompatible(fc, rc))) + } else { + true + } + } + } + /** * Build an ORC search argument applier that can filter input file splits * when predicate push-down filters have been specified. From b724f000cfe45626797d3c22f3d92d722b2a5dd0 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 23 Jul 2021 09:27:33 +0800 Subject: [PATCH 07/14] default to false Signed-off-by: Firestarman --- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index d94dfa35d1f..838b97f71bc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -1122,7 +1122,7 @@ private case class GpuOrcFileFilterHandler( readSchema.getChildren.asScala.forall(rc => fileSchema.getChildren.asScala.exists(fc => isSchemaCompatible(fc, rc))) } else { - true + false } } } From 2a3023411a07bbdc9e11c96a81ea4cdee7d6c9af Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 26 Jul 2021 11:32:09 +0800 Subject: [PATCH 08/14] Fallback to cpu for nested schema pruning. Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 19 ---------- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 35 ++++--------------- 2 files changed, 6 insertions(+), 48 deletions(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index f3317e696ac..c91704f90ed 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -20,7 +20,6 @@ from marks import * from pyspark.sql.types import * from spark_session import with_cpu_session, with_spark_session -from parquet_test import _nested_pruning_schemas def read_orc_df(data_path): return lambda spark : spark.read.orc(data_path) @@ -282,21 +281,3 @@ def test_missing_column_names_filter(spark_tmp_table_factory, reader_confs): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.sql("SELECT _col3,_col2 FROM {} WHERE _col2 = '155'".format(table_name)), all_confs) - - -@pytest.mark.parametrize('data_gen,read_schema', _nested_pruning_schemas, ids=idfn) -@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) -@pytest.mark.parametrize('v1_enabled_list', ["", "orc"]) -@pytest.mark.parametrize('nested_enabled', ["true", "false"]) -def test_read_nested_pruning(spark_tmp_path, data_gen, read_schema, reader_confs, v1_enabled_list, nested_enabled): - data_path = spark_tmp_path + '/ORC_DATA' - with_cpu_session( - lambda spark : gen_df(spark, data_gen).write.orc(data_path)) - all_confs = reader_confs.copy() - all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, - 'spark.sql.optimizer.nestedSchemaPruning.enabled': nested_enabled}) - # This is a hack to get the type in a slightly less verbose way - rs = StructGen(read_schema, nullable=False).data_type - assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.read.schema(rs).orc(data_path), - conf=all_confs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 838b97f71bc..7718d3332d2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -124,6 +124,11 @@ object GpuOrcScanBase { .getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) { meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") } + + if (sparkSession.conf + .getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean)) { + meta.willNotWorkOnGpu("nested schema pruning is not supported yet for ORC") + } } } @@ -1087,11 +1092,7 @@ private case class GpuOrcFileFilterHandler( val newReadSchema = TypeDescription.createStruct() readerFieldNames.zip(readerChildren).foreach { case (readField, readType) => val (fileType, fileFieldName) = fileTypesMap.getOrElse(readField, (null, null)) - // When column pruning is enabled, the readType is not always equal to the fileType, - // may be part of the fileType. e.g. - // read type: struct - // file type: struct - if (!isSchemaCompatible(fileType, readType)) { + if (readType != fileType) { throw new QueryExecutionException("Incompatible schemas for ORC file" + s" at ${partFile.filePath}\n" + s" file schema: $fileSchema\n" + @@ -1103,30 +1104,6 @@ private case class GpuOrcFileFilterHandler( newReadSchema } - /** - * The read schema is compatible with the file schema only when - * 1) They are equal to each other - * 2) The read schema is part of the file schema for struct types. - * - * @param fileSchema input file's ORC schema - * @param readSchema ORC schema for what will be read - * @return true if they are compatible, otherwise false - */ - private def isSchemaCompatible( - fileSchema: TypeDescription, - readSchema: TypeDescription): Boolean = { - fileSchema == readSchema || - fileSchema != null && readSchema != null && - fileSchema.getCategory == readSchema.getCategory && { - if (readSchema.getChildren != null) { - readSchema.getChildren.asScala.forall(rc => - fileSchema.getChildren.asScala.exists(fc => isSchemaCompatible(fc, rc))) - } else { - false - } - } - } - /** * Build an ORC search argument applier that can filter input file splits * when predicate push-down filters have been specified. From b7fdeb71359c7e654119a9d612a83859f3dbf2ae Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 26 Jul 2021 11:55:06 +0800 Subject: [PATCH 09/14] Updated tests Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 12 ++++++++++-- .../scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index c91704f90ed..2e2715df0d6 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -58,7 +58,13 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl, ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10), ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))] -# Some struct gens, but not all because of nesting +# Some struct gens, but not all because of nesting. +# No empty struct gen because it leads to an error as below. +# ''' +# E pyspark.sql.utils.AnalysisException: +# E Datasource does not support writing empty or nested empty schemas. +# E Please make sure the data schema has at least one or more column(s). +# ''' orc_struct_gens_sample = [orc_basic_struct_gen, StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]]), StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] @@ -98,7 +104,9 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e with_cpu_session( lambda spark : gen_df(spark, gen_list).write.orc(data_path)) all_confs = reader_confs.copy() - all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) + # Nested schema pruning is not supported yet for orc read. + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"}) assert_gpu_and_cpu_are_equal_collect( read_func(data_path), conf=all_confs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 7718d3332d2..a817c34fd45 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -127,7 +127,7 @@ object GpuOrcScanBase { if (sparkSession.conf .getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean)) { - meta.willNotWorkOnGpu("nested schema pruning is not supported yet for ORC") + meta.willNotWorkOnGpu("nested schema pruning is not supported yet") } } } From 79dbfe453e2b52ef08673f2610a64856d173ec00 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 26 Jul 2021 11:55:06 +0800 Subject: [PATCH 10/14] Fall back to the CPU only finding struct column Signed-off-by: Firestarman --- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index a817c34fd45..a98e1259c45 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -126,7 +126,8 @@ object GpuOrcScanBase { } if (sparkSession.conf - .getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean)) { + .getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean) && + schema.exists(_.dataType.isInstanceOf[StructType])) { meta.willNotWorkOnGpu("nested schema pruning is not supported yet") } } From 4d188852c78867b8929274b83ddafb620a12a9e6 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 26 Jul 2021 14:42:06 +0800 Subject: [PATCH 11/14] Add tests for nested predicate pushdown. Signed-off-by: Firestarman --- integration_tests/src/main/python/orc_test.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 2e2715df0d6..30b49196173 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -106,7 +106,7 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e all_confs = reader_confs.copy() # Nested schema pruning is not supported yet for orc read. all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, - 'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"}) + 'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"}) assert_gpu_and_cpu_are_equal_collect( read_func(data_path), conf=all_confs) @@ -127,15 +127,19 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/ORC_DATA' - gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen)] + # Append two struct columns to verify nested predicate pushdown. + gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen), + ('s1', StructGen([['sa', orc_gen]])), + ('s2', StructGen([['sa', StructGen([['ssa', orc_gen]])]]))] s0 = gen_scalar(orc_gen, force_no_nulls=True) with_cpu_session( lambda spark : gen_df(spark, gen_list).orderBy('a').write.orc(data_path)) all_confs = reader_confs.copy() - all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"}) rf = read_func(data_path) assert_gpu_and_cpu_are_equal_collect( - lambda spark: rf(spark).select(f.col('a') >= s0), + lambda spark: rf(spark).select(f.col('a') >= s0, f.col('s1.sa') >= s0, f.col('s2.sa.ssa') >= s0), conf=all_confs) orc_compress_options = ['none', 'uncompressed', 'snappy', 'zlib'] From de93d07bb5d49244057dcd0237c18db04a66ae2e Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 26 Jul 2021 16:57:39 +0800 Subject: [PATCH 12/14] correct the fallback condition Signed-off-by: Firestarman --- .../scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index a98e1259c45..c370393890f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -58,7 +58,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.OrcFilters import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -126,13 +126,21 @@ object GpuOrcScanBase { } if (sparkSession.conf - .getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean) && - schema.exists(_.dataType.isInstanceOf[StructType])) { + .getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean) && + schema.exists(f => hasStructType(f.dataType))) { meta.willNotWorkOnGpu("nested schema pruning is not supported yet") } } + + private def hasStructType(dt: DataType): Boolean = dt match { + case m: MapType => hasStructType(m.keyType) || hasStructType(m.valueType) + case a: ArrayType => hasStructType(a.elementType) + case _: StructType => true + case _ => false + } } + /** * The multi-file partition reader factory for creating cloud reading or coalescing reading for * ORC file format. From 4506d0e2a87987079c8861d351f04e738cfc64c9 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 26 Jul 2021 17:07:06 +0800 Subject: [PATCH 13/14] remove extra line Signed-off-by: Firestarman --- integration_tests/run_pyspark_from_build.sh | 2 +- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 5e153ec291d..75127266d74 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -102,7 +102,7 @@ else RUN_TESTS_COMMAND=("$SCRIPTPATH"/runtests.py --rootdir "$LOCAL_ROOTDIR" - "$LOCAL_ROOTDIR"/src/main/python) + "$LOCAL_ROOTDIR"/src/main/python/orc_test.py) TEST_COMMON_OPTS=(-v -rfExXs diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index c370393890f..60ffee2651f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -140,7 +140,6 @@ object GpuOrcScanBase { } } - /** * The multi-file partition reader factory for creating cloud reading or coalescing reading for * ORC file format. From a1240e06a760893ac0216bacbf936decc423b82d Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Tue, 27 Jul 2021 12:57:57 +0800 Subject: [PATCH 14/14] Address comments --- integration_tests/run_pyspark_from_build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 75127266d74..5e153ec291d 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -102,7 +102,7 @@ else RUN_TESTS_COMMAND=("$SCRIPTPATH"/runtests.py --rootdir "$LOCAL_ROOTDIR" - "$LOCAL_ROOTDIR"/src/main/python/orc_test.py) + "$LOCAL_ROOTDIR"/src/main/python) TEST_COMMON_OPTS=(-v -rfExXs