From 75c21a425caca5e9ad939139e594bc426bb711f5 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 1 Jul 2022 14:11:05 -0500 Subject: [PATCH 1/5] Use the new API instead of the old one Signed-off-by: Robert (Bobby) Evans --- .../nvidia/spark/rapids/GpuParquetScan.scala | 53 +++++-------------- 1 file changed, 14 insertions(+), 39 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 0d0cd52e7e0..0097f345723 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -496,57 +496,32 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte } } - private def addNamesAndCount(names: ArrayBuffer[String], children: ArrayBuffer[Int], - name: String, numChildren: Int): Unit = { - names += name - children += numChildren - } - /** - * Flatten a Spark schema according to the parquet standard. This does not work for older - * parquet files that did not fully follow the standard, or were before some of these - * things were standardized. This will be fixed as a part of - * https://github.com/NVIDIA/spark-rapids-jni/issues/210 + * Convert the spark data type to something that the native processor can understand. */ - private def depthFirstNamesHelper(schema: DataType, elementName: String, makeLowerCase: Boolean, - names: ArrayBuffer[String], children: ArrayBuffer[Int]): Unit = { - val name = if (makeLowerCase) { - elementName.toLowerCase(Locale.ROOT) - } else { - elementName - } + private def convertToParquetNative(schema: DataType): ParquetFooter.SchemaElement = { schema match { case cst: StructType => - addNamesAndCount(names, children, name, cst.length) + val schemaBuilder = ParquetFooter.StructElement.builder() cst.fields.foreach { field => - depthFirstNamesHelper(field.dataType, field.name, makeLowerCase, names, children) + schemaBuilder.addChild(field.name, convertToParquetNative(field.dataType)) } + schemaBuilder.build() case _: NumericType | BinaryType | BooleanType | DateType | TimestampType | StringType => - addNamesAndCount(names, children, name, 0) + new ParquetFooter.ValueElement() case at: ArrayType => - addNamesAndCount(names, children, name, 1) - addNamesAndCount(names, children, "list", 1) - depthFirstNamesHelper(at.elementType, "element", makeLowerCase, names, children) + new ParquetFooter.ListElement(convertToParquetNative(at.elementType)) case mt: MapType => - addNamesAndCount(names, children, name, 1) - addNamesAndCount(names, children, "key_value", 2) - depthFirstNamesHelper(mt.keyType, "key", makeLowerCase, names, children) - depthFirstNamesHelper(mt.valueType, "value", makeLowerCase, names, children) + new ParquetFooter.MapElement( + convertToParquetNative(mt.keyType), + convertToParquetNative(mt.valueType)) case other => throw new UnsupportedOperationException(s"Need some help here $other...") } } - def depthFirstNames(schema: StructType, makeLowerCase: Boolean): (Array[String], Array[Int]) = { - withResource(new NvtxRange("prepare schema", NvtxColor.WHITE)) { _ => - // Initialize them with a quick length for non-nested values - val names = new ArrayBuffer[String](schema.length) - val children = new ArrayBuffer[Int](schema.length) - schema.fields.foreach { field => - depthFirstNamesHelper(field.dataType, field.name, makeLowerCase, names, children) - } - (names.toArray, children.toArray) - } + def convertToFooterSchema(schema: StructType): ParquetFooter.StructElement = { + convertToParquetNative(schema).asInstanceOf[ParquetFooter.StructElement] } def readAndFilterFooter( @@ -554,7 +529,7 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte conf : Configuration, readDataSchema: StructType, filePath: Path): ParquetFooter = { - val (names, children) = depthFirstNames(readDataSchema, !isCaseSensitive) + val footerSchema = convertToFooterSchema(readDataSchema) val fs = filePath.getFileSystem(conf) val stat = fs.getFileStatus(filePath) // Much of this code came from the parquet_mr projects ParquetFileReader, and was modified @@ -612,7 +587,7 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte file.length } ParquetFooter.readAndFilter(footerBuffer, file.start, len, - names, children, readDataSchema.length, !isCaseSensitive) + footerSchema, !isCaseSensitive) } } } From a4df6b235f2b6a060302e876c4e245b4281c817a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 8 Jul 2022 12:29:38 -0500 Subject: [PATCH 2/5] Adding AUTO native parquet support and legacy tests Signed-off-by: Robert (Bobby) Evans --- docs/configs.md | 2 +- .../src/main/python/parquet_test.py | 32 ++------------- .../spark/rapids/GpuParquetScanBase.scala | 8 ---- .../rapids/shims/RapidsParquetScanMeta.scala | 6 --- .../rapids/shims/RapidsParquetScanMeta.scala | 6 --- .../rapids/shims/RapidsParquetScanMeta.scala | 6 --- .../nvidia/spark/rapids/GpuParquetScan.scala | 39 ++++++++++++++++++- .../rapids/GpuReadParquetFileFormat.scala | 10 +---- .../com/nvidia/spark/rapids/RapidsConf.scala | 15 ++++--- 9 files changed, 49 insertions(+), 75 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index d67a6c9f22c..1f7b24f9379 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -93,7 +93,7 @@ Name | Description | Default Value spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|2147483647 spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small Parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true -spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. JAVA is the default and should match closely with Apache Spark. NATIVE will parse and filter the footer using C++. In the worst case this can be slower than JAVA, but not by much if anything. This is still a very experimental feature and there are known bugs and limitations. It should work for most cases when reading data that complies with the latest Parquet standard, but can run into issues for older data that does not fully comply with it.|JAVA +spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. AUTO is the default and decides which path to take using a heuristic. JAVA follows closely with what Apache Spark does. NATIVE will parse and filter the footer using C++. In the worst case this can be slower than JAVA. This is why a heruistic is used to select the appropriate parser.|AUTO spark.rapids.sql.format.parquet.reader.type|Sets the Parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 6bd839f789b..a2e3d970459 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -257,7 +257,7 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, [ArrayGen(decimal_gen_32bit, max_length=10)], [StructGen([['child0', decimal_gen_32bit]])]], ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) -@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] @@ -362,7 +362,7 @@ def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reade lambda spark : spark.read.parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed @@ -386,7 +386,7 @@ def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed @@ -412,32 +412,6 @@ def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, re lambda spark : spark.read.parquet(data_path), conf=all_confs) -@pytest.mark.parametrize('reader_confs', reader_opt_confs_native) -@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -@allow_non_gpu('ColumnarToRowExec') -def test_parquet_read_merge_schema_native_fallback(spark_tmp_path, v1_enabled_list, reader_confs): - # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed - # we should go with a more standard set of generators - parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens - first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] - first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' - with_cpu_session( - lambda spark: gen_df(spark, first_gen_list).write.parquet(first_data_path), - conf=rebase_write_legacy_conf) - second_gen_list = [(('_c' if i % 2 == 0 else '_b') + str(i), gen) for i, gen in enumerate(parquet_gens)] - second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' - with_cpu_session( - lambda spark: gen_df(spark, second_gen_list).write.parquet(second_data_path), - conf=rebase_write_corrected_conf) - data_path = spark_tmp_path + '/PARQUET_DATA' - all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) - assert_gpu_fallback_collect( - lambda spark: spark.read.option('mergeSchema', 'true').parquet(data_path), - cpu_fallback_class_name='FileSourceScanExec' if v1_enabled_list == 'parquet' else 'BatchScanExec', - conf=all_confs) - @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) def test_read_parquet_with_empty_clipped_schema(spark_tmp_path, v1_enabled_list, reader_confs): diff --git a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index 525f32c8ca6..7a351f4bba4 100644 --- a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -30,14 +30,6 @@ object GpuReadParquetFileFormat { val fsse = meta.wrapped val session = fsse.sqlContext.sparkSession GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta) - - if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { - val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf) - if (options.mergeSchema) { - meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" + - " mergeSchema is enabled") - } - } } } diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index 7892c02a0aa..5c10748cbd2 100644 --- a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -30,12 +30,6 @@ class RapidsParquetScanMeta( override def tagSelfForGpu(): Unit = { GpuParquetScan.tagSupport(this) - - if (pScan.options.getBoolean("mergeSchema", false) && - conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { - willNotWorkOnGpu("Native footer reader for parquet does not work when" + - " mergeSchema is enabled") - } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index 60f5c3abc54..f94df07f13b 100644 --- a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -35,12 +35,6 @@ class RapidsParquetScanMeta( willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" + " on datasource V2 yet.") } - - if (pScan.options.getBoolean("mergeSchema", false) && - conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { - willNotWorkOnGpu("Native footer reader for parquet does not work when" + - " mergeSchema is enabled") - } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala index a251f29f1eb..05070f7041c 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsParquetScanMeta.scala @@ -40,12 +40,6 @@ class RapidsParquetScanMeta( willNotWorkOnGpu( "aggregates pushed into Parquet read, which is a metadata only operation") } - - if (pScan.options.getBoolean("mergeSchema", false) && - conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { - willNotWorkOnGpu("Native footer reader for parquet does not work when" + - " mergeSchema is enabled") - } } override def convertToGpu(): Scan = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 0097f345723..a1cf90c8bd2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -249,6 +249,39 @@ object GpuParquetScan { meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode") } } + + private def numNodesEstimate(dt: DataType): Long = dt match { + case StructType(fields) => + 1 + fields.map(f => numNodesEstimate(f.dataType)).sum + case ArrayType(elementType, _) => + // In parquet the there are two groups needed to make a List/Array + 2 + numNodesEstimate(elementType) + case MapType(keyType, valueType, _) => + // In parquet there is a repeating group followed by a key/value group + 2 + numNodesEstimate(keyType) + numNodesEstimate(valueType) + case _ => 1 + } + + /** + * Adjust the footer reader type based off of a heuristic. + */ + def footerReaderHeuristic( + inputValue: ParquetFooterReaderType.Value, + data: StructType, + read: StructType): ParquetFooterReaderType.Value = { + inputValue match { + case ParquetFooterReaderType.AUTO => + val dnc = numNodesEstimate(data) + val rnc = numNodesEstimate(read) + if (rnc.toDouble/dnc <= 0.5 && dnc - rnc > 10) { + ParquetFooterReaderType.NATIVE + } else { + ParquetFooterReaderType.JAVA + } + case other => other + } + } + } /** @@ -945,7 +978,8 @@ case class GpuParquetMultiFilePartitionReaderFactory( private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val numThreads = rapidsConf.multiThreadReadNumThreads private val maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel - private val footerReadType = rapidsConf.parquetReaderFooterType + private val footerReadType = GpuParquetScan.footerReaderHeuristic( + rapidsConf.parquetReaderFooterType, dataSchema, readDataSchema) private val ignoreMissingFiles = sqlConf.ignoreMissingFiles private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles private val filterHandler = GpuParquetFileFilterHandler(sqlConf) @@ -1047,7 +1081,8 @@ case class GpuParquetPartitionReaderFactory( private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes - private val footerReadType = rapidsConf.parquetReaderFooterType + private val footerReadType = GpuParquetScan.footerReaderHeuristic( + rapidsConf.parquetReaderFooterType, dataSchema, readDataSchema) private val filterHandler = GpuParquetFileFilterHandler(sqlConf) private val readUseFieldId = ParquetSchemaClipShims.useFieldId(sqlConf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala index 407aa1e1041..504ed0f4ae1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadParquetFileFormat.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -63,13 +63,5 @@ object GpuReadParquetFileFormat { val fsse = meta.wrapped val session = SparkShimImpl.sessionFromPlan(fsse) GpuParquetScan.tagSupport(session, fsse.requiredSchema, meta) - - if (meta.conf.parquetReaderFooterType == RapidsConf.ParquetFooterReaderType.NATIVE) { - val options = new ParquetOptions(fsse.relation.options, session.sessionState.conf) - if (options.mergeSchema) { - meta.willNotWorkOnGpu("Native footer reader for parquet does not work when" + - " mergeSchema is enabled") - } - } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 65714baf775..e5722ec756e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -743,7 +743,7 @@ object RapidsConf { .createWithDefault(true) object ParquetFooterReaderType extends Enumeration { - val JAVA, NATIVE = Value + val JAVA, NATIVE, AUTO = Value } val PARQUET_READER_FOOTER_TYPE = @@ -752,16 +752,14 @@ object RapidsConf { "happens when there are a large number of columns and relatively few " + "of them are being read on a large number of files. " + "This provides the ability to use a different path to parse and filter the footer. " + - "JAVA is the default and should match closely with Apache Spark. NATIVE will parse and " + - "filter the footer using C++. In the worst case this can be slower than JAVA, but " + - "not by much if anything. This is still a very experimental feature and there are " + - "known bugs and limitations. It should work for most cases when reading data that " + - "complies with the latest Parquet standard, but can run into issues for older data " + - "that does not fully comply with it.") + "AUTO is the default and decides which path to take using a heuristic. JAVA " + + "follows closely with what Apache Spark does. NATIVE will parse and " + + "filter the footer using C++. In the worst case this can be slower than JAVA. " + + "This is why a heruistic is used to select the appropriate parser.") .stringConf .transform(_.toUpperCase(java.util.Locale.ROOT)) .checkValues(ParquetFooterReaderType.values.map(_.toString)) - .createWithDefault(ParquetFooterReaderType.JAVA.toString) + .createWithDefault(ParquetFooterReaderType.AUTO.toString) // This is an experimental feature now. And eventually, should be enabled or disabled depending // on something that we don't know yet but would try to figure out. @@ -1737,6 +1735,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val parquetReaderFooterType: ParquetFooterReaderType.Value = { get(PARQUET_READER_FOOTER_TYPE) match { + case "AUTO" => ParquetFooterReaderType.AUTO case "NATIVE" => ParquetFooterReaderType.NATIVE case "JAVA" => ParquetFooterReaderType.JAVA case other => From c66945f9da211f6d55cc85d6ba50f4a1ebebe0a6 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 12 Jul 2022 13:55:05 -0500 Subject: [PATCH 3/5] Addressed review comments --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 7ed92a0b4fe..b2f9cfab180 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -754,8 +754,7 @@ object RapidsConf { "This provides the ability to use a different path to parse and filter the footer. " + "AUTO is the default and decides which path to take using a heuristic. JAVA " + "follows closely with what Apache Spark does. NATIVE will parse and " + - "filter the footer using C++. In the worst case this can be slower than JAVA. " + - "This is why a heruistic is used to select the appropriate parser.") + "filter the footer using C++.") .stringConf .transform(_.toUpperCase(java.util.Locale.ROOT)) .checkValues(ParquetFooterReaderType.values.map(_.toString)) From d14ed23ac569150d2257770aa1ddb4cbcaa0508e Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 18 Jul 2022 11:15:23 -0500 Subject: [PATCH 4/5] Docs --- docs/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configs.md b/docs/configs.md index 9afdeced6a1..8276c1a3ef4 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -94,7 +94,7 @@ Name | Description | Default Value spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|2147483647 spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small Parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true -spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. AUTO is the default and decides which path to take using a heuristic. JAVA follows closely with what Apache Spark does. NATIVE will parse and filter the footer using C++. In the worst case this can be slower than JAVA. This is why a heruistic is used to select the appropriate parser.|AUTO +spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. AUTO is the default and decides which path to take using a heuristic. JAVA follows closely with what Apache Spark does. NATIVE will parse and filter the footer using C++.|AUTO spark.rapids.sql.format.parquet.reader.type|Sets the Parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true From 917854f052f1a01e4a18daf98458cad9aacbd551 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 19 Jul 2022 09:35:05 -0500 Subject: [PATCH 5/5] Added more comments --- .../com/nvidia/spark/rapids/GpuParquetScan.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index a1cf90c8bd2..0f2932ea590 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -250,16 +250,25 @@ object GpuParquetScan { } } + /** + * This estimates the number of nodes in a parquet footer schema based off of the parquet spec + * Specifically https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + */ private def numNodesEstimate(dt: DataType): Long = dt match { case StructType(fields) => + // A struct has a group node that holds the children 1 + fields.map(f => numNodesEstimate(f.dataType)).sum case ArrayType(elementType, _) => - // In parquet the there are two groups needed to make a List/Array + // A List/Array has one group node to tag it as a list and another one + // that is marked as repeating. 2 + numNodesEstimate(elementType) case MapType(keyType, valueType, _) => - // In parquet there is a repeating group followed by a key/value group + // A Map has one group node to tag it as a map and another one + // that is marked as repeating, but holds the key/value 2 + numNodesEstimate(keyType) + numNodesEstimate(valueType) - case _ => 1 + case _ => + // All the other types are just value types and are represented by a non-group node + 1 } /**