Skip to content

Commit

Permalink
Fallback parquet reading with merged schema and native footer reader (#…
Browse files Browse the repository at this point in the history
…5500)

Native footer reader for parquet fetches data fields totally based on read schema, which may lead to overflow if merge schema is enabled. When merge schema is enabled, the file schema of each file partition may not contain the complete (read) schema. In this situation, native footer reader will come up with incorrect footers.

Fallback the parquet reading to CPU if merge schema and native footer reader are both enabled, in case of buffer overflow like #5493
  • Loading branch information
sperlingxx authored May 19, 2022
1 parent 4f29451 commit a5e1e1e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 11 deletions.
37 changes: 32 additions & 5 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ def read_parquet_sql(data_path):

# For now the native configs are not compatible with spark.sql.parquet.writeLegacyFormat written files
# for nested types
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf, native_parquet_file_reader_conf,
native_multithreaded_parquet_file_reader_conf, native_coalesce_parquet_file_reader_conf]
reader_opt_confs_native = [native_parquet_file_reader_conf, native_multithreaded_parquet_file_reader_conf,
native_coalesce_parquet_file_reader_conf]

reader_opt_confs_no_native = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]

reader_opt_confs = reader_opt_confs_native + reader_opt_confs_no_native

@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
Expand Down Expand Up @@ -341,7 +342,7 @@ def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reade
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
Expand All @@ -365,7 +366,7 @@ def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs
lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
Expand All @@ -391,6 +392,32 @@ def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, re
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs_native)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@allow_non_gpu('ColumnarToRowExec')
def test_parquet_read_merge_schema_native_fallback(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens
first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
lambda spark: gen_df(spark, first_gen_list).write.parquet(first_data_path),
conf=rebase_write_legacy_conf)
second_gen_list = [(('_c' if i % 2 == 0 else '_b') + str(i), gen) for i, gen in enumerate(parquet_gens)]
second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1'
with_cpu_session(
lambda spark: gen_df(spark, second_gen_list).write.parquet(second_data_path),
conf=rebase_write_corrected_conf)
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_fallback_collect(
lambda spark: spark.read.option('mergeSchema', 'true').parquet(data_path),
cpu_fallback_class_name='FileSourceScanExec' if v1_enabled_list == 'parquet' else 'BatchScanExec',
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_input_meta(spark_tmp_path, v1_enabled_list, reader_confs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class RapidsParquetScanMeta(

override def tagSelfForGpu(): Unit = {
GpuParquetScan.tagSupport(this)

if (pScan.options.getBoolean("mergeSchema", false) &&
conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}

override def convertToGpu(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ class RapidsParquetScanMeta(
willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}

if (pScan.options.getBoolean("mergeSchema", false) &&
conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}

override def convertToGpu(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class RapidsParquetScanMeta(
willNotWorkOnGpu(
"aggregates pushed into Parquet read, which is a metadata only operation")
}

if (pScan.options.getBoolean("mergeSchema", false) &&
conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}

override def convertToGpu(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -60,10 +60,15 @@ class GpuReadParquetFileFormat extends ParquetFileFormat with GpuReadFileFormatW
object GpuReadParquetFileFormat {
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val fsse = meta.wrapped
GpuParquetScan.tagSupport(
SparkShimImpl.sessionFromPlan(fsse),
fsse.requiredSchema,
meta
)
val session = SparkShimImpl.sessionFromPlan(fsse)
GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta)

if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf)
if (options.mergeSchema) {
meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}
}
}

0 comments on commit a5e1e1e

Please sign in to comment.