From a5e1e1e762767e444ecb502ef4a31e272d1503b8 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Thu, 19 May 2022 17:23:48 +0800 Subject: [PATCH] Fallback parquet reading with merged schema and native footer reader (#5500) Native footer reader for parquet fetches data fields totally based on read schema, which may lead to overflow if merge schema is enabled. When merge schema is enabled, the file schema of each file partition may not contain the complete (read) schema. In this situation, native footer reader will come up with incorrect footers. Fallback the parquet reading to CPU if merge schema and native footer reader are both enabled, in case of buffer overflow like #5493 --- .../src/main/python/parquet_test.py | 37 ++++++++++++++++--- .../rapids/shims/RapidsParquetScanMeta.scala | 6 +++ .../rapids/shims/RapidsParquetScanMeta.scala | 6 +++ .../rapids/shims/RapidsParquetScanMeta.scala | 6 +++ .../rapids/GpuReadParquetFileFormat.scala | 17 ++++++--- 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index a1c6a174fce..fd85b3a6267 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -76,13 +76,14 @@ def read_parquet_sql(data_path): # For now the native configs are not compatible with spark.sql.parquet.writeLegacyFormat written files # for nested types -reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, - coalesce_parquet_file_reader_conf, native_parquet_file_reader_conf, - native_multithreaded_parquet_file_reader_conf, native_coalesce_parquet_file_reader_conf] +reader_opt_confs_native = [native_parquet_file_reader_conf, native_multithreaded_parquet_file_reader_conf, + native_coalesce_parquet_file_reader_conf] reader_opt_confs_no_native = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, coalesce_parquet_file_reader_conf] +reader_opt_confs = reader_opt_confs_native + reader_opt_confs_no_native + @pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @@ -341,7 +342,7 @@ def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reade lambda spark : spark.read.parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed @@ -365,7 +366,7 @@ def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed @@ -391,6 +392,32 @@ def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, re lambda spark : spark.read.parquet(data_path), conf=all_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_native) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +@allow_non_gpu('ColumnarToRowExec') +def test_parquet_read_merge_schema_native_fallback(spark_tmp_path, v1_enabled_list, reader_confs): + # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed + # we should go with a more standard set of generators + parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark: gen_df(spark, first_gen_list).write.parquet(first_data_path), + conf=rebase_write_legacy_conf) + second_gen_list = [(('_c' if i % 2 == 0 else '_b') + str(i), gen) for i, gen in enumerate(parquet_gens)] + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark: gen_df(spark, second_gen_list).write.parquet(second_data_path), + conf=rebase_write_corrected_conf) + data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + assert_gpu_fallback_collect( + lambda spark: spark.read.option('mergeSchema', 'true').parquet(data_path), + cpu_fallback_class_name='FileSourceScanExec' if v1_enabled_list == 'parquet' else 'BatchScanExec', + conf=all_confs) + @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index 5c10748cbd2..7892c02a0aa 100644 --- a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -30,6 +30,12 @@ class RapidsParquetScanMeta( override def tagSelfForGpu(): Unit = { GpuParquetScan.tagSupport(this) + + if (pScan.options.getBoolean("mergeSchema", false) && + conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index f94df07f13b..60f5c3abc54 100644 --- a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -35,6 +35,12 @@ class RapidsParquetScanMeta( willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" + " on datasource V2 yet.") } + + if (pScan.options.getBoolean("mergeSchema", false) && + conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index 05070f7041c..a251f29f1eb 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -40,6 +40,12 @@ class RapidsParquetScanMeta( willNotWorkOnGpu( "aggregates pushed into Parquet read, which is a metadata only operation") } + + if (pScan.options.getBoolean("mergeSchema", false) && + conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala index cf8f19083bd..7b47b1ff0cd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -60,10 +60,15 @@ class GpuReadParquetFileFormat extends ParquetFileFormat with GpuReadFileFormatW object GpuReadParquetFileFormat { def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { val fsse = meta.wrapped - GpuParquetScan.tagSupport( - SparkShimImpl.sessionFromPlan(fsse), - fsse.requiredSchema, - meta - ) + val session = SparkShimImpl.sessionFromPlan(fsse) + GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta) + + if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf) + if (options.mergeSchema) { + meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } + } } }