Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding AUTO native parquet support and legacy tests #5983

Merged
merged 7 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Name | Description | Default Value
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small Parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.reader.footer.type"></a>spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. JAVA is the default and should match closely with Apache Spark. NATIVE will parse and filter the footer using C++. In the worst case this can be slower than JAVA, but not by much if anything. This is still a very experimental feature and there are known bugs and limitations. It should work for most cases when reading data that complies with the latest Parquet standard, but can run into issues for older data that does not fully comply with it.|JAVA
<a name="sql.format.parquet.reader.footer.type"></a>spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. AUTO is the default and decides which path to take using a heuristic. JAVA follows closely with what Apache Spark does. NATIVE will parse and filter the footer using C++.|AUTO
<a name="sql.format.parquet.reader.type"></a>spark.rapids.sql.format.parquet.reader.type|Sets the Parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.format.parquet.writer.int96.enabled"></a>spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true
Expand Down
32 changes: 3 additions & 29 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase,
[ArrayGen(decimal_gen_32bit, max_length=10)],
[StructGen([['child0', decimal_gen_32bit]])]], ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
Expand Down Expand Up @@ -365,7 +365,7 @@ def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reade
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
Expand All @@ -389,7 +389,7 @@ def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs
lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
Expand All @@ -415,32 +415,6 @@ def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, re
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs_native)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@allow_non_gpu('ColumnarToRowExec')
def test_parquet_read_merge_schema_native_fallback(spark_tmp_path, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens
first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
lambda spark: gen_df(spark, first_gen_list).write.parquet(first_data_path),
conf=rebase_write_legacy_conf)
second_gen_list = [(('_c' if i % 2 == 0 else '_b') + str(i), gen) for i, gen in enumerate(parquet_gens)]
second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1'
with_cpu_session(
lambda spark: gen_df(spark, second_gen_list).write.parquet(second_data_path),
conf=rebase_write_corrected_conf)
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_fallback_collect(
lambda spark: spark.read.option('mergeSchema', 'true').parquet(data_path),
cpu_fallback_class_name='FileSourceScanExec' if v1_enabled_list == 'parquet' else 'BatchScanExec',
conf=all_confs)

@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
def test_read_parquet_with_empty_clipped_schema(spark_tmp_path, v1_enabled_list, reader_confs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ object GpuReadParquetFileFormat {
val fsse = meta.wrapped
val session = fsse.sqlContext.sparkSession
GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta)

if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf)
if (options.mergeSchema) {
meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ class RapidsParquetScanMeta(

override def tagSelfForGpu(): Unit = {
GpuParquetScan.tagSupport(this)

if (pScan.options.getBoolean("mergeSchema", false) &&
conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}

override def convertToGpu(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ class RapidsParquetScanMeta(
willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}

if (pScan.options.getBoolean("mergeSchema", false) &&
conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}

override def convertToGpu(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ class RapidsParquetScanMeta(
willNotWorkOnGpu(
"aggregates pushed into Parquet read, which is a metadata only operation")
}

if (pScan.options.getBoolean("mergeSchema", false) &&
conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}

override def convertToGpu(): Scan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,48 @@ object GpuParquetScan {
meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode")
}
}

/**
* This estimates the number of nodes in a parquet footer schema based off of the parquet spec
* Specifically https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
*/
private def numNodesEstimate(dt: DataType): Long = dt match {
abellina marked this conversation as resolved.
Show resolved Hide resolved
case StructType(fields) =>
// A struct has a group node that holds the children
1 + fields.map(f => numNodesEstimate(f.dataType)).sum
case ArrayType(elementType, _) =>
// A List/Array has one group node to tag it as a list and another one
// that is marked as repeating.
2 + numNodesEstimate(elementType)
case MapType(keyType, valueType, _) =>
// A Map has one group node to tag it as a map and another one
// that is marked as repeating, but holds the key/value
2 + numNodesEstimate(keyType) + numNodesEstimate(valueType)
case _ =>
// All the other types are just value types and are represented by a non-group node
1
}

/**
* Adjust the footer reader type based off of a heuristic.
*/
def footerReaderHeuristic(
inputValue: ParquetFooterReaderType.Value,
data: StructType,
read: StructType): ParquetFooterReaderType.Value = {
inputValue match {
case ParquetFooterReaderType.AUTO =>
val dnc = numNodesEstimate(data)
val rnc = numNodesEstimate(read)
if (rnc.toDouble/dnc <= 0.5 && dnc - rnc > 10) {
ParquetFooterReaderType.NATIVE
} else {
ParquetFooterReaderType.JAVA
}
case other => other
}
}

}

/**
Expand Down Expand Up @@ -945,7 +987,8 @@ case class GpuParquetMultiFilePartitionReaderFactory(
private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix
private val numThreads = rapidsConf.multiThreadReadNumThreads
private val maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel
private val footerReadType = rapidsConf.parquetReaderFooterType
private val footerReadType = GpuParquetScan.footerReaderHeuristic(
rapidsConf.parquetReaderFooterType, dataSchema, readDataSchema)
private val ignoreMissingFiles = sqlConf.ignoreMissingFiles
private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
private val filterHandler = GpuParquetFileFilterHandler(sqlConf)
Expand Down Expand Up @@ -1047,7 +1090,8 @@ case class GpuParquetPartitionReaderFactory(
private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix
private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows
private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes
private val footerReadType = rapidsConf.parquetReaderFooterType
private val footerReadType = GpuParquetScan.footerReaderHeuristic(
rapidsConf.parquetReaderFooterType, dataSchema, readDataSchema)

private val filterHandler = GpuParquetFileFilterHandler(sqlConf)
private val readUseFieldId = ParquetSchemaClipShims.useFieldId(sqlConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -63,13 +63,5 @@ object GpuReadParquetFileFormat {
val fsse = meta.wrapped
val session = SparkShimImpl.sessionFromPlan(fsse)
GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta)

if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) {
val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf)
if (options.mergeSchema) {
meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" +
" mergeSchema is enabled")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ object RapidsConf {
.createWithDefault(true)

object ParquetFooterReaderType extends Enumeration {
val JAVA, NATIVE = Value
val JAVA, NATIVE, AUTO = Value
}

val PARQUET_READER_FOOTER_TYPE =
Expand All @@ -752,16 +752,13 @@ object RapidsConf {
"happens when there are a large number of columns and relatively few " +
"of them are being read on a large number of files. " +
"This provides the ability to use a different path to parse and filter the footer. " +
"JAVA is the default and should match closely with Apache Spark. NATIVE will parse and " +
"filter the footer using C++. In the worst case this can be slower than JAVA, but " +
"not by much if anything. This is still a very experimental feature and there are " +
"known bugs and limitations. It should work for most cases when reading data that " +
"complies with the latest Parquet standard, but can run into issues for older data " +
"that does not fully comply with it.")
"AUTO is the default and decides which path to take using a heuristic. JAVA " +
"follows closely with what Apache Spark does. NATIVE will parse and " +
"filter the footer using C++.")
.stringConf
.transform(_.toUpperCase(java.util.Locale.ROOT))
.checkValues(ParquetFooterReaderType.values.map(_.toString))
.createWithDefault(ParquetFooterReaderType.JAVA.toString)
.createWithDefault(ParquetFooterReaderType.AUTO.toString)

// This is an experimental feature now. And eventually, should be enabled or disabled depending
// on something that we don't know yet but would try to figure out.
Expand Down Expand Up @@ -1744,6 +1741,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val parquetReaderFooterType: ParquetFooterReaderType.Value = {
get(PARQUET_READER_FOOTER_TYPE) match {
case "AUTO" => ParquetFooterReaderType.AUTO
case "NATIVE" => ParquetFooterReaderType.NATIVE
case "JAVA" => ParquetFooterReaderType.JAVA
case other =>
Expand Down