diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index a1c6a174fce..fd85b3a6267 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -76,13 +76,14 @@ def read_parquet_sql(data_path): # For now the native configs are not compatible with spark.sql.parquet.writeLegacyFormat written files # for nested types -reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, - coalesce_parquet_file_reader_conf, native_parquet_file_reader_conf, - native_multithreaded_parquet_file_reader_conf, native_coalesce_parquet_file_reader_conf] +reader_opt_confs_native = [native_parquet_file_reader_conf, native_multithreaded_parquet_file_reader_conf, + native_coalesce_parquet_file_reader_conf] reader_opt_confs_no_native = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, coalesce_parquet_file_reader_conf] +reader_opt_confs = reader_opt_confs_native + reader_opt_confs_no_native + @pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @@ -341,7 +342,7 @@ def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reade lambda spark : spark.read.parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed @@ -365,7 +366,7 @@ def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed @@ -391,6 +392,32 @@ def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, re lambda spark : spark.read.parquet(data_path), conf=all_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_native) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +@allow_non_gpu('ColumnarToRowExec') +def test_parquet_read_merge_schema_native_fallback(spark_tmp_path, v1_enabled_list, reader_confs): + # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed + # we should go with a more standard set of generators + parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark: gen_df(spark, first_gen_list).write.parquet(first_data_path), + conf=rebase_write_legacy_conf) + second_gen_list = [(('_c' if i % 2 == 0 else '_b') + str(i), gen) for i, gen in enumerate(parquet_gens)] + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark: gen_df(spark, second_gen_list).write.parquet(second_data_path), + conf=rebase_write_corrected_conf) + data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + assert_gpu_fallback_collect( + lambda spark: spark.read.option('mergeSchema', 'true').parquet(data_path), + cpu_fallback_class_name='FileSourceScanExec' if v1_enabled_list == 'parquet' else 'BatchScanExec', + conf=all_confs) + @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index 5c10748cbd2..7892c02a0aa 100644 --- a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -30,6 +30,12 @@ class RapidsParquetScanMeta( override def tagSelfForGpu(): Unit = { GpuParquetScan.tagSupport(this) + + if (pScan.options.getBoolean("mergeSchema", false) && + conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index f94df07f13b..60f5c3abc54 100644 --- a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -35,6 +35,12 @@ class RapidsParquetScanMeta( willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" + " on datasource V2 yet.") } + + if (pScan.options.getBoolean("mergeSchema", false) && + conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index 05070f7041c..a251f29f1eb 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -40,6 +40,12 @@ class RapidsParquetScanMeta( willNotWorkOnGpu( "aggregates pushed into Parquet read, which is a metadata only operation") } + + if (pScan.options.getBoolean("mergeSchema", false) && + conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala index cf8f19083bd..7b47b1ff0cd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -60,10 +60,15 @@ class GpuReadParquetFileFormat extends ParquetFileFormat with GpuReadFileFormatW object GpuReadParquetFileFormat { def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { val fsse = meta.wrapped - GpuParquetScan.tagSupport( - SparkShimImpl.sessionFromPlan(fsse), - fsse.requiredSchema, - meta - ) + val session = SparkShimImpl.sessionFromPlan(fsse) + GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta) + + if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { + val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf) + if (options.mergeSchema) { + meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" + + " mergeSchema is enabled") + } + } } }