From 61f7a2eebedc7f024deb58c335b07bd929c9462d Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 25 Apr 2022 14:07:06 +0800 Subject: [PATCH 1/8] Support coalescing reading for avro Signed-off-by: Firestarman --- docs/configs.md | 3 + integration_tests/run_pyspark_from_build.sh | 9 +- .../src/main/python/avro_test.py | 33 +- .../nvidia/spark/rapids/AvroDataReader.scala | 99 ++- .../nvidia/spark/rapids/AvroFileWriter.scala | 53 ++ .../spark/rapids/GpuMultiFileReader.scala | 28 +- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 23 +- .../nvidia/spark/rapids/GpuParquetScan.scala | 23 +- .../spark/rapids/GpuTransitionOverrides.scala | 7 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 96 ++- .../spark/sql/rapids/ExternalSource.scala | 116 ++- .../apache/spark/sql/rapids/GpuAvroScan.scala | 789 ++++++++++++------ .../sql/rapids/GpuFileSourceScanExec.scala | 23 +- .../sql/rapids/GpuReadAvroFileFormat.scala | 7 +- tests/pom.xml | 24 + ...Suites.scala => GpuReaderTypeSuites.scala} | 120 ++- 16 files changed, 1047 insertions(+), 406 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroFileWriter.scala rename tests/src/test/scala/com/nvidia/spark/rapids/{GpuReaderSuites.scala => GpuReaderTypeSuites.scala} (51%) diff --git a/docs/configs.md b/docs/configs.md index 4daf78ecce8..91809cd2e9c 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -75,7 +75,10 @@ Name | Description | Default Value spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false +spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type|2147483647 +spark.rapids.sql.format.avro.multiThreadedRead.numThreads|The maximum number of threads, on one executor, to use for reading small avro files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type.|20 spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false +spark.rapids.sql.format.avro.reader.type|Sets the avro reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using COALESCING is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.avro.multiThreadedRead.numThreads. By default this is set to AUTO so we select the reader we think is best. This will be COALESCING.|AUTO spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index a1302572665..39295a1abb2 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -57,13 +57,10 @@ else TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar) fi - # `./run_pyspark_from_build.sh` runs all tests including avro_test.py with spark-avro.jar - # in the classpath. + # `./run_pyspark_from_build.sh` runs all the tests excluding the avro tests in 'avro_test.py'. # - # `./run_pyspark_from_build.sh -k xxx ` runs all xxx tests with spark-avro.jar in the classpath - # - # `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` run all tests (except the marker skipif()) - # without spark-avro.jar + # `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` runs all the tests, including the tests + # in 'avro_test.py'. if [[ $( echo ${INCLUDE_SPARK_AVRO_JAR} | tr [:upper:] [:lower:] ) == "true" ]]; then export INCLUDE_SPARK_AVRO_JAR=true diff --git a/integration_tests/src/main/python/avro_test.py b/integration_tests/src/main/python/avro_test.py index 418701d8e8e..43603563bf3 100644 --- a/integration_tests/src/main/python/avro_test.py +++ b/integration_tests/src/main/python/avro_test.py @@ -30,46 +30,56 @@ 'spark.rapids.sql.format.avro.enabled': 'true', 'spark.rapids.sql.format.avro.read.enabled': 'true'} +rapids_reader_types = ['PERFILE', 'COALESCING'] -@pytest.mark.parametrize('gen', support_gens) -@pytest.mark.parametrize('v1_enabled_list', ["avro", ""]) -def test_basic_read(spark_tmp_path, gen, v1_enabled_list): +@pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"]) +@pytest.mark.parametrize('reader_type', rapids_reader_types) +def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type): + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)] data_path = spark_tmp_path + '/AVRO_DATA' + # 50 files for the coalescing reading case with_cpu_session( - lambda spark: unary_op_df(spark, gen).write.format("avro").save(data_path) + lambda spark: gen_df(spark, gen_list).repartition(50).write.format("avro").save(data_path) ) all_confs = copy_and_update(_enable_all_types_conf, { + 'spark.rapids.sql.format.avro.reader.type': reader_type, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.format("avro").load(data_path), conf=all_confs) -@pytest.mark.parametrize('v1_enabled_list', ["", "avro"]) -def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list): +@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"]) +@pytest.mark.parametrize('reader_type', rapids_reader_types) +def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_type): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)] first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20' with_cpu_session( - lambda spark: gen_df(spark, gen_list).write.format("avro").save(first_data_path)) + lambda spark: gen_df(spark, + gen_list).repartition(50).write.format("avro").save(first_data_path)) second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21' with_cpu_session( - lambda spark: gen_df(spark, gen_list).write.format("avro").save(second_data_path)) + lambda spark: gen_df(spark, + gen_list).repartition(50).write.format("avro").save(second_data_path)) third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22' with_cpu_session( - lambda spark: gen_df(spark, gen_list).write.format("avro").save(third_data_path)) + lambda spark: gen_df(spark, + gen_list).repartition(50).write.format("avro").save(third_data_path)) data_path = spark_tmp_path + '/AVRO_DATA' all_confs = copy_and_update(_enable_all_types_conf, { + 'spark.rapids.sql.format.avro.reader.type': reader_type, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.format("avro").load(data_path), conf=all_confs) -@pytest.mark.parametrize('v1_enabled_list', ["", "avro"]) -def test_avro_input_meta(spark_tmp_path, v1_enabled_list): +@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"]) +@pytest.mark.parametrize('reader_type', rapids_reader_types) +def test_avro_input_meta(spark_tmp_path, v1_enabled_list, reader_type): first_data_path = spark_tmp_path + '/AVRO_DATA/key=0' with_cpu_session( lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path)) @@ -79,6 +89,7 @@ def test_avro_input_meta(spark_tmp_path, v1_enabled_list): data_path = spark_tmp_path + '/AVRO_DATA' all_confs = copy_and_update(_enable_all_types_conf, { + 'spark.rapids.sql.format.avro.reader.type': reader_type, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.format("avro").load(data_path) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala index 1b999ee40ad..0672076396b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala @@ -19,12 +19,12 @@ package com.nvidia.spark.rapids import java.io.{InputStream, IOException} import java.nio.charset.StandardCharsets -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import org.apache.avro.Schema import org.apache.avro.file.{DataFileConstants, SeekableInput} -import org.apache.avro.file.DataFileConstants.{MAGIC, SYNC_SIZE} import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory} +import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} private class SeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput { var oneByte = new Array[Byte](1) @@ -59,22 +59,68 @@ private class SeekableInputStream(in: SeekableInput) extends InputStream with Se } /** - * The header information of Avro file + * The header information of an Avro file. */ -class Header { - var meta = Map[String, Array[Byte]]() - var metaKeyList = ArrayBuffer[String]() - var sync = new Array[Byte](DataFileConstants.SYNC_SIZE) - var schema: Schema = _ - private var firstBlockStart: Long = _ - - private[rapids] def update(schemaValue: String, firstBlockStart: Long) = { - schema = new Schema.Parser().setValidate(false).setValidateDefaults(false) - .parse(schemaValue) - this.firstBlockStart = firstBlockStart +class Header private[rapids] { + private[rapids] val meta = mutable.Map[String, Array[Byte]]() + private[rapids] val sync = new Array[Byte](DataFileConstants.SYNC_SIZE) + private[rapids] var headerSize: Option[Long] = None + + def firstBlockStart: Long = headerSize.getOrElse { + val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM) + AvroFileWriter(out).writeHeader(this) + val newSize = out.getByteCount + headerSize = Some(newSize) + newSize } - def getFirstBlockStart: Long = firstBlockStart + @transient + lazy val schema: Schema = { + getMetaString(DataFileConstants.SCHEMA) + .map(s => new Schema.Parser().setValidateDefaults(false).setValidate(false).parse(s)) + .orNull + } + + private def getMetaString(key: String): Option[String] = { + meta.get(key).map(new String(_, StandardCharsets.UTF_8)) + } +} + +object Header { + /** + * Merge the metadata of the given headers. + * Note: It does not check the compatibility of the headers. + * @param headers whose metadata to be merged. + * @return the first header but having the new merged metadata, or + * None if the input is empty. + */ + def mergeMetadata(headers: Seq[Header]): Option[Header] = { + if (headers.isEmpty) { + None + } else if (headers.size == 1) { + Some(headers.head) + } else { + val mergedHeader = headers.reduce { (merged, h) => + merged.meta ++= h.meta + merged + } + // need to re-compute the header size + mergedHeader.headerSize = None + Some(mergedHeader) + } + } + + /** Test whether the two headers have the same sync marker */ + def hasSameSync(h1: Header, h2: Header): Boolean = h1.sync.sameElements(h2.sync) + + /** + * Test whether the two headers have conflicts in the metadata. + * A conflict means a key exists in both of the two headers' metadata, + * and maps to different values. + */ + def hasConflictInMetadata(h1: Header, h2: Header): Boolean = h1.meta.exists { + case (k, v) => h2.meta.contains(k) && !h2.meta.get(k).get.sameElements(v) + } } /** @@ -98,18 +144,16 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { private var firstBlockStart: Long = 0 // store all blocks info - private val blocks: ArrayBuffer[BlockInfo] = ArrayBuffer.empty + private val blocks: mutable.ArrayBuffer[BlockInfo] = mutable.ArrayBuffer.empty initialize() - def getBlocks(): ArrayBuffer[BlockInfo] = { - blocks - } + def getBlocks(): Seq[BlockInfo] = blocks.toSeq def getHeader(): Header = header private def initialize() = { - val magic = new Array[Byte](MAGIC.length) + val magic = new Array[Byte](DataFileConstants.MAGIC.length) vin.readFixed(magic) magic match { @@ -128,14 +172,13 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { val bb = new Array[Byte](value.remaining()) value.get(bb) header.meta += (key -> bb) - header.metaKeyList += key } l = vin.mapNext().toInt } while (l != 0) } vin.readFixed(header.sync) firstBlockStart = sin.tell - vin.inputStream.available // get the first block Start address - header.update(getMetaString(DataFileConstants.SCHEMA), firstBlockStart) + header.headerSize = Some(firstBlockStart) parseBlocks() } @@ -168,7 +211,8 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { val blockDataSizeLen: Int = BinaryData.encodeLong(blockDataSize, buf, 0) // (len of entries) + (len of block size) + (block size) + (sync size) - val blockLength = blockCountLen + blockDataSizeLen + blockDataSize + SYNC_SIZE + val blockLength = blockCountLen + blockDataSizeLen + blockDataSize + + DataFileConstants.SYNC_SIZE blocks += BlockInfo(blockStart, blockLength, blockDataSize, blockCount) // Do we need to check the SYNC BUFFER, or just let cudf do it? @@ -176,15 +220,6 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { } } - /** Return the value of a metadata property. */ - private def getMeta(key: String): Array[Byte] = header.meta.getOrElse(key, new Array[Byte](1)) - - private def getMetaString(key: String): String = { - val value = getMeta(key) - if (value == null) return null - new String(value, StandardCharsets.UTF_8) - } - override def close(): Unit = { vin.inputStream().close() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroFileWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroFileWriter.scala new file mode 100644 index 00000000000..470be97ea47 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroFileWriter.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.io.OutputStream + +import org.apache.avro.file.DataFileConstants +import org.apache.avro.io.EncoderFactory + +/** + * AvroDataWriter, used to write a avro file header to the output stream. + */ +class AvroFileWriter(os: OutputStream) { + + private val vout = new EncoderFactory().directBinaryEncoder(os, null) + + final def writeHeader(header: Header): Unit = { + val meta = header.meta + // 1) write magic + vout.writeFixed(DataFileConstants.MAGIC) + // 2) write metadata + vout.writeMapStart() + vout.setItemCount(meta.size) + meta.foreach{ case (key, value) => + vout.startItem() + vout.writeString(key) + vout.writeBytes(value) + } + vout.writeMapEnd() + // 3) write initial sync + vout.writeFixed(header.sync) + vout.flush() + } +} + +object AvroFileWriter { + + def apply(os: OutputStream): AvroFileWriter = new AvroFileWriter(os) +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index b94448e0558..5cde655b271 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -114,6 +114,29 @@ object MultiFileThreadPoolUtil { } } +/** A thread pool for multi-file reading */ +class MultiFileReaderThreadPool { + private var threadPool: Option[ThreadPoolExecutor] = None + + private def initThreadPool( + threadTag: String, + numThreads: Int): ThreadPoolExecutor = synchronized { + if (threadPool.isEmpty) { + threadPool = Some(MultiFileThreadPoolUtil.createThreadPool(threadTag, numThreads)) + } + threadPool.get + } + + /** + * Get the existing internal thread pool or create one with the given tag and thread + * number if it does not exist. + * Note: The tag and thread number will be ignored if the thread pool is already created. + */ + def getOrCreateThreadPool(threadTag: String, numThreads: Int): ThreadPoolExecutor = { + threadPool.getOrElse(initThreadPool(threadTag, numThreads)) + } +} + /** * The base multi-file partition reader factory to create the cloud reading or * coalescing reading respectively. @@ -692,10 +715,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( * Write a header for a specific file format. If there is no header for the file format, * just ignore it and return 0 * + * @param paths the paths of files to be coalcesed into a single batch * @param buffer where the header will be written * @return how many bytes written */ - def writeFileHeader(buffer: HostMemoryBuffer): Long + def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long /** * Writer a footer for a specific file format. If there is no footer for the file format, @@ -823,7 +847,7 @@ abstract class MultiFileCoalescingPartitionReaderBase( val (buffer, bufferSize, footerOffset, outBlocks) = closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb => // Second, write header - var offset = writeFileHeader(hmb) + var offset = writeFileHeader(filesAndBlocks.keys.toSeq, hmb) val allOutputBlocks = scala.collection.mutable.ArrayBuffer[DataBlockBase]() val tc = TaskContext.get diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index c06fbfa235f..81cccb6a980 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -721,22 +721,7 @@ class GpuOrcPartitionReader( // Singleton threadpool that is used across all the tasks. // Please note that the TaskContext is not set in these threads and should not be used. -object OrcMultiFileThreadPoolFactory { - private var threadPool: Option[ThreadPoolExecutor] = None - - private def initThreadPool( - threadTag: String, - numThreads: Int): ThreadPoolExecutor = synchronized { - if (threadPool.isEmpty) { - threadPool = Some(MultiFileThreadPoolUtil.createThreadPool(threadTag, numThreads)) - } - threadPool.get - } - - def getThreadPool(threadTag: String, numThreads: Int): ThreadPoolExecutor = { - threadPool.getOrElse(initThreadPool(threadTag, numThreads)) - } -} +object OrcMultiFileThreadPool extends MultiFileReaderThreadPool private object OrcTools extends Arm { @@ -1517,7 +1502,7 @@ class MultiFileCloudOrcPartitionReader( * @return ThreadPoolExecutor */ override def getThreadPool(numThreads: Int): ThreadPoolExecutor = { - OrcMultiFileThreadPoolFactory.getThreadPool(getFileFormatShortName, numThreads) + OrcMultiFileThreadPool.getOrCreateThreadPool(getFileFormatShortName, numThreads) } /** @@ -1943,7 +1928,7 @@ class MultiFileOrcPartitionReader( * @return ThreadPoolExecutor */ override def getThreadPool(numThreads: Int): ThreadPoolExecutor = { - OrcMultiFileThreadPoolFactory.getThreadPool(getFileFormatShortName, numThreads) + OrcMultiFileThreadPool.getOrCreateThreadPool(getFileFormatShortName, numThreads) } /** @@ -2024,7 +2009,7 @@ class MultiFileOrcPartitionReader( * @param buffer where the header will be written * @return how many bytes written */ - override def writeFileHeader(buffer: HostMemoryBuffer): Long = { + override def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long = { withResource(new HostMemoryOutputStream(buffer)) { out => withResource(new DataOutputStream(out)) { dataOut => dataOut.writeBytes(OrcFile.MAGIC) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index ce19f727329..71c29c04222 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -992,22 +992,7 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics // Singleton threadpool that is used across all the tasks. // Please note that the TaskContext is not set in these threads and should not be used. -object ParquetMultiFileThreadPoolFactory { - private var threadPool: Option[ThreadPoolExecutor] = None - - private def initThreadPool( - threadTag: String, - numThreads: Int): ThreadPoolExecutor = synchronized { - if (threadPool.isEmpty) { - threadPool = Some(MultiFileThreadPoolUtil.createThreadPool(threadTag, numThreads)) - } - threadPool.get - } - - def getThreadPool(threadTag: String, numThreads: Int): ThreadPoolExecutor = { - threadPool.getOrElse(initThreadPool(threadTag, numThreads)) - } -} +object ParquetMultiFileThreadPool extends MultiFileReaderThreadPool // Parquet schema wrapper private case class ParquetSchemaWrapper(schema: MessageType) extends SchemaBase @@ -1167,7 +1152,7 @@ class MultiFileParquetPartitionReader( } override def getThreadPool(numThreads: Int): ThreadPoolExecutor = { - ParquetMultiFileThreadPoolFactory.getThreadPool(getFileFormatShortName, numThreads) + ParquetMultiFileThreadPool.getOrCreateThreadPool(getFileFormatShortName, numThreads) } override def getBatchRunner( @@ -1211,7 +1196,7 @@ class MultiFileParquetPartitionReader( evolveSchemaIfNeededAndClose(table, splits.mkString(","), clippedSchema) } - override def writeFileHeader(buffer: HostMemoryBuffer): Long = { + override def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long = { withResource(new HostMemoryOutputStream(buffer)) { out => out.write(ParquetPartitionReader.PARQUET_MAGIC) out.getPos @@ -1414,7 +1399,7 @@ class MultiFileCloudParquetPartitionReader( * @return ThreadPoolExecutor */ override def getThreadPool(numThreads: Int): ThreadPoolExecutor = { - ParquetMultiFileThreadPoolFactory.getThreadPool(getFileFormatShortName, numThreads) + ParquetMultiFileThreadPool.getOrCreateThreadPool(getFileFormatShortName, numThreads) } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 8f0c80c7f4e..df3bfc1ba86 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedC import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, DropTableExec, ShowTablesExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} -import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv} +import org.apache.spark.sql.rapids.{ExternalSource, GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase} /** @@ -334,13 +334,16 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { disableUntilInput: Boolean = false): SparkPlan = plan match { case batchScan: GpuBatchScanExec => if ((batchScan.scan.isInstanceOf[GpuParquetScan] || - batchScan.scan.isInstanceOf[GpuOrcScan]) && + batchScan.scan.isInstanceOf[GpuOrcScan] || + ExternalSource.isSupportedScan(batchScan.scan)) && (disableUntilInput || disableScanUntilInput(batchScan))) { val scanCopy = batchScan.scan match { case parquetScan: GpuParquetScan => parquetScan.copy(queryUsesInputFile=true) case orcScan: GpuOrcScan => orcScan.copy(queryUsesInputFile=true) + case eScan if ExternalSource.isSupportedScan(eScan) => + ExternalSource.copyScanWithInputFileTrue(eScan) case _ => throw new RuntimeException("Wrong format") // never reach here } batchScan.copy(scan=scanCopy) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 1679367fb37..b0c7e9b5b8e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -287,6 +287,11 @@ class ConfBuilder(val key: String, val register: ConfEntry[_] => Unit) { } } +object RapidsReaderType extends Enumeration { + type RapidsReaderType = Value + val AUTO, COALESCING, MULTITHREADED, PERFILE = Value +} + object RapidsConf { private val registeredConfs = new ListBuffer[ConfEntry[_]]() @@ -737,10 +742,6 @@ object RapidsConf { .booleanConf .createWithDefault(false) - object ParquetReaderType extends Enumeration { - val AUTO, COALESCING, MULTITHREADED, PERFILE = Value - } - val PARQUET_READER_TYPE = conf("spark.rapids.sql.format.parquet.reader.type") .doc("Sets the parquet reader type. We support different types that are optimized for " + "different environments. The original Spark style reader can be selected by setting this " + @@ -765,8 +766,8 @@ object RapidsConf { "in the cloud. See spark.rapids.cloudSchemes.") .stringConf .transform(_.toUpperCase(java.util.Locale.ROOT)) - .checkValues(ParquetReaderType.values.map(_.toString)) - .createWithDefault(ParquetReaderType.AUTO.toString) + .checkValues(RapidsReaderType.values.map(_.toString)) + .createWithDefault(RapidsReaderType.AUTO.toString) /** List of schemes that are always considered cloud storage schemes */ private lazy val DEFAULT_CLOUD_SCHEMES = @@ -827,11 +828,6 @@ object RapidsConf { .booleanConf .createWithDefault(true) - // This will be deleted when COALESCING is implemented for ORC - object OrcReaderType extends Enumeration { - val AUTO, COALESCING, MULTITHREADED, PERFILE = Value - } - val ORC_READER_TYPE = conf("spark.rapids.sql.format.orc.reader.type") .doc("Sets the orc reader type. We support different types that are optimized for " + "different environments. The original Spark style reader can be selected by setting this " + @@ -856,8 +852,8 @@ object RapidsConf { "in the cloud. See spark.rapids.cloudSchemes.") .stringConf .transform(_.toUpperCase(java.util.Locale.ROOT)) - .checkValues(OrcReaderType.values.map(_.toString)) - .createWithDefault(OrcReaderType.AUTO.toString) + .checkValues(RapidsReaderType.values.map(_.toString)) + .createWithDefault(RapidsReaderType.AUTO.toString) val ORC_MULTITHREAD_READ_NUM_THREADS = conf("spark.rapids.sql.format.orc.multiThreadedRead.numThreads") @@ -941,6 +937,43 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val AVRO_READER_TYPE = conf("spark.rapids.sql.format.avro.reader.type") + .doc("Sets the avro reader type. We support different types that are optimized for " + + "different environments. The original Spark style reader can be selected by setting this " + + "to PERFILE which individually reads and copies files to the GPU. Loading many small files " + + "individually has high overhead, and using COALESCING is " + + "recommended instead. The COALESCING reader is good when using a local file system where " + + "the executors are on the same nodes or close to the nodes the data is being read on. " + + "This reader coalesces all the files assigned to a task into a single host buffer before " + + "sending it down to the GPU. It copies blocks from a single file into a host buffer in " + + "separate threads in parallel, see " + + "spark.rapids.sql.format.avro.multiThreadedRead.numThreads. " + + "By default this is set to AUTO so we select the reader we think is best. This will " + + "be COALESCING.") + .stringConf + .transform(_.toUpperCase(java.util.Locale.ROOT)) + .checkValues((RapidsReaderType.values - RapidsReaderType.MULTITHREADED).map(_.toString)) + .createWithDefault(RapidsReaderType.AUTO.toString) + + val AVRO_MULTITHREAD_READ_NUM_THREADS = + conf("spark.rapids.sql.format.avro.multiThreadedRead.numThreads") + .doc("The maximum number of threads, on one executor, to use for reading small " + + "avro files in parallel. This can not be changed at runtime after the executor has " + + "started. Used with MULTITHREADED reader, see " + + "spark.rapids.sql.format.avro.reader.type.") + .integerConf + .createWithDefault(20) + + val AVRO_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL = + conf("spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel") + .doc("A limit on the maximum number of files per task processed in parallel on the CPU " + + "side before the file is sent to the GPU. This affects the amount of host memory used " + + "when reading the files in parallel. Used with MULTITHREADED reader, see " + + "spark.rapids.sql.format.avro.reader.type") + .integerConf + .checkValue(v => v > 0, "The maximum number of files must be greater than 0.") + .createWithDefault(Integer.MAX_VALUE) + val ENABLE_RANGE_WINDOW_BYTES = conf("spark.rapids.sql.window.range.byte.enabled") .doc("When the order-by column of a range based window is byte type and " + "the range boundary calculated for a value has overflow, CPU and GPU will get " + @@ -1018,6 +1051,12 @@ object RapidsConf { .stringConf .createWithDefault(null) + val AVRO_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.avro.debug.dumpPrefix") + .doc("A path prefix where AVRO split file data is dumped for debugging.") + .internal() + .stringConf + .createWithDefault(null) + val HASH_AGG_REPLACE_MODE = conf("spark.rapids.sql.hashAgg.replaceMode") .doc("Only when hash aggregate exec has these modes (\"all\" by default): " + "\"all\" (try to replace all aggregates, default), " + @@ -1593,6 +1632,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val orcDebugDumpPrefix: String = get(ORC_DEBUG_DUMP_PREFIX) + lazy val avroDebugDumpPrefix: String = get(AVRO_DEBUG_DUMP_PREFIX) + lazy val hashAggReplaceMode: String = get(HASH_AGG_REPLACE_MODE) lazy val partialMergeDistinctEnabled: Boolean = get(PARTIAL_MERGE_DISTINCT_ENABLED) @@ -1640,16 +1681,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isParquetInt96WriteEnabled: Boolean = get(ENABLE_PARQUET_INT96_WRITE) lazy val isParquetPerFileReadEnabled: Boolean = - ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.PERFILE + RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.PERFILE lazy val isParquetAutoReaderEnabled: Boolean = - ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.AUTO + RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.AUTO lazy val isParquetCoalesceFileReadEnabled: Boolean = isParquetAutoReaderEnabled || - ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.COALESCING + RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.COALESCING lazy val isParquetMultiThreadReadEnabled: Boolean = isParquetAutoReaderEnabled || - ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.MULTITHREADED + RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.MULTITHREADED lazy val parquetMultiThreadReadNumThreads: Int = get(PARQUET_MULTITHREAD_READ_NUM_THREADS) @@ -1666,16 +1707,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE) lazy val isOrcPerFileReadEnabled: Boolean = - OrcReaderType.withName(get(ORC_READER_TYPE)) == OrcReaderType.PERFILE + RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.PERFILE lazy val isOrcAutoReaderEnabled: Boolean = - OrcReaderType.withName(get(ORC_READER_TYPE)) == OrcReaderType.AUTO + RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.AUTO lazy val isOrcCoalesceFileReadEnabled: Boolean = isOrcAutoReaderEnabled || - OrcReaderType.withName(get(ORC_READER_TYPE)) == OrcReaderType.COALESCING + RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.COALESCING lazy val isOrcMultiThreadReadEnabled: Boolean = isOrcAutoReaderEnabled || - OrcReaderType.withName(get(ORC_READER_TYPE)) == OrcReaderType.MULTITHREADED + RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.MULTITHREADED lazy val orcMultiThreadReadNumThreads: Int = get(ORC_MULTITHREAD_READ_NUM_THREADS) @@ -1705,6 +1746,19 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isAvroReadEnabled: Boolean = get(ENABLE_AVRO_READ) + lazy val isAvroPerFileReadEnabled: Boolean = + RapidsReaderType.withName(get(AVRO_READER_TYPE)) == RapidsReaderType.PERFILE + + lazy val isAvroAutoReaderEnabled: Boolean = + RapidsReaderType.withName(get(AVRO_READER_TYPE)) == RapidsReaderType.AUTO + + lazy val isAvroCoalesceFileReadEnabled: Boolean = isAvroAutoReaderEnabled || + RapidsReaderType.withName(get(AVRO_READER_TYPE)) == RapidsReaderType.COALESCING + + lazy val avroMultiThreadReadNumThreads: Int = get(AVRO_MULTITHREAD_READ_NUM_THREADS) + + lazy val maxNumAvroFilesParallel: Int = get(AVRO_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL) + lazy val shuffleManagerEnabled: Boolean = get(SHUFFLE_MANAGER_ENABLED) lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExternalSource.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExternalSource.scala index 84b44fc502c..cfd02b7424c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExternalSource.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExternalSource.scala @@ -20,12 +20,14 @@ import scala.util.{Failure, Success, Try} import com.nvidia.spark.rapids._ -import org.apache.spark.sql.avro.AvroFileFormat -import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.avro.{AvroFileFormat, AvroOptions} +import org.apache.spark.sql.connector.read.{PartitionReaderFactory, Scan} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.v2.avro.AvroScan -import org.apache.spark.util.Utils +import org.apache.spark.util.{SerializableConfiguration, Utils} object ExternalSource { @@ -40,22 +42,40 @@ object ExternalSource { } } - def tagSupportForGpuFileSourceScanExec(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { + /** If the file format is supported as an external source */ + def isSupportedFormat(format: FileFormat): Boolean = { if (hasSparkAvroJar) { - meta.wrapped.relation.fileFormat match { - case _: AvroFileFormat => GpuReadAvroFileFormat.tagSupport(meta) - case f => - meta.willNotWorkOnGpu(s"unsupported file format: ${f.getClass.getCanonicalName}") + format match { + case _: AvroFileFormat => true + case _ => false } - } else { + } else false + } + + def isPerFileReadEnabledForFormat(format: FileFormat, conf: RapidsConf): Boolean = { + if (hasSparkAvroJar) { + format match { + case _: AvroFileFormat => conf.isAvroPerFileReadEnabled + case _ => false + } + } else false + } + + def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { + if (hasSparkAvroJar) { meta.wrapped.relation.fileFormat match { + case _: AvroFileFormat => GpuReadAvroFileFormat.tagSupport(meta) case f => meta.willNotWorkOnGpu(s"unsupported file format: ${f.getClass.getCanonicalName}") } } } - def convertFileFormatForGpuFileSourceScanExec(format: FileFormat): FileFormat = { + /** + * Get a read file format for the input format. + * Better to check if the format is supported first by calling 'isSupportedFormat' + */ + def getReadFileFormat(format: FileFormat): FileFormat = { if (hasSparkAvroJar) { format match { case _: AvroFileFormat => new GpuReadAvroFileFormat @@ -63,11 +83,57 @@ object ExternalSource { throw new IllegalArgumentException(s"${f.getClass.getCanonicalName} is not supported") } } else { + throw new IllegalArgumentException(s"${format.getClass.getCanonicalName} is not supported") + } + } + + /** + * Create a multi-file reader factory for the input format. + * Better to check if the format is supported first by calling 'isSupportedFormat' + */ + def createMultiFileReaderFactory( + format: FileFormat, + broadcastedConf: Broadcast[SerializableConfiguration], + pushedFilters: Array[Filter], + fileScan: GpuFileSourceScanExec): PartitionReaderFactory = { + if (hasSparkAvroJar) { format match { - case f => - throw new IllegalArgumentException(s"${f.getClass.getCanonicalName} is not supported") + case _: AvroFileFormat => + val f = GpuAvroMultiFilePartitionReaderFactory( + fileScan.relation.sparkSession.sessionState.conf, + fileScan.rapidsConf, + broadcastedConf, + fileScan.relation.dataSchema, + fileScan.requiredSchema, + fileScan.relation.partitionSchema, + new AvroOptions(fileScan.relation.options, broadcastedConf.value.value), + fileScan.allMetrics, + pushedFilters, + fileScan.queryUsesInputFile) + // Now only coalescing is supported, so need to check if it can be used + // for the final choice. + if (f.canUseCoalesceFilesReader){ + f + } else { + // Fall back to PerFile reading + GpuAvroPartitionReaderFactory( + fileScan.relation.sparkSession.sessionState.conf, + fileScan.rapidsConf, + broadcastedConf, + fileScan.relation.dataSchema, + fileScan.requiredSchema, + fileScan.relation.partitionSchema, + new AvroOptions(fileScan.relation.options, broadcastedConf.value.value), + fileScan.allMetrics) + } + case _ => + // never reach here + throw new RuntimeException(s"File format $format is not supported yet") } + } else { + throw new RuntimeException(s"File format $format is not supported yet") } + } def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = { @@ -85,6 +151,7 @@ object ExternalSource { a.readDataSchema, a.readPartitionSchema, a.options, + a.pushedFilters, conf, a.partitionFilters, a.dataFilters) @@ -93,4 +160,29 @@ object ExternalSource { } else Map.empty } + /** If the scan is supported as an external source */ + def isSupportedScan(scan: Scan): Boolean = { + if (hasSparkAvroJar) { + scan match { + case _: GpuAvroScan => true + case _ => false + } + } else false + } + + /** + * Clone the input scan with setting 'true' to the 'queryUsesInputFile'. + * Better to check if the scan is supported first by calling 'isSupportedScan'. + */ + def copyScanWithInputFileTrue(scan: Scan): Scan = { + if (hasSparkAvroJar) { + scan match { + case avroScan: GpuAvroScan => avroScan.copy(queryUsesInputFile=true) + case _ => + throw new RuntimeException(s"Unsupported scan type: ${scan.getClass.getSimpleName}") + } + } else { + throw new RuntimeException(s"Unsupported scan type: ${scan.getClass.getSimpleName}") + } + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index b2aaa3e3b26..ff4b15b0a75 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -16,17 +16,21 @@ package org.apache.spark.sql.rapids -import java.io.OutputStream +import java.io.{FileNotFoundException, IOException, OutputStream} import java.net.URI +import java.util.concurrent.{Callable, ThreadPoolExecutor} import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.collection.JavaConverters.mapAsScalaMapConverter -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} +import scala.language.implicitConversions import scala.math.max import ai.rapids.cudf.{AvroOptions => CudfAvroOptions, HostMemoryBuffer, NvtxColor, NvtxRange, Table} -import com.nvidia.spark.rapids.{Arm, AvroDataFileReader, AvroFormatType, BlockInfo, ColumnarPartitionReaderWithPartitionValues, FileFormatChecks, FilePartitionReaderBase, GpuBatchUtils, GpuColumnVector, GpuMetric, GpuSemaphore, Header, HostMemoryOutputStream, NvtxWithMetrics, PartitionReaderWithBytesRead, RapidsConf, RapidsMeta, ReadFileOp, ScanMeta, ScanWithMetrics} +import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetric.{GPU_DECODE_TIME, NUM_OUTPUT_BATCHES, PEAK_DEVICE_MEMORY, READ_FS_TIME, SEMAPHORE_WAIT_TIME, WRITE_BUFFER_TIME} +import org.apache.avro.Schema import org.apache.avro.file.DataFileConstants.SYNC_SIZE import org.apache.avro.mapred.FsInput import org.apache.hadoop.conf.Configuration @@ -44,7 +48,9 @@ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, FileScan} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.rapids.shims.AvroUtils +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.v2.avro.AvroScan @@ -68,9 +74,6 @@ object GpuAvroScan { options: Map[String, String], meta: RapidsMeta[_, _, _]): Unit = { - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) - val parsedOptions = new AvroOptions(options, hadoopConf) - if (!meta.conf.isAvroEnabled) { meta.willNotWorkOnGpu("Avro input and output has been disabled. To enable set " + s"${RapidsConf.ENABLE_AVRO} to true") @@ -81,6 +84,8 @@ object GpuAvroScan { s"${RapidsConf.ENABLE_AVRO_READ} to true") } + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) + val parsedOptions = new AvroOptions(options, hadoopConf) AvroUtils.tagSupport(parsedOptions, meta) FileFormatChecks.tag(meta, readSchema, AvroFormatType, ReadFileOp) @@ -95,14 +100,13 @@ case class GpuAvroScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], rapidsConf: RapidsConf, partitionFilters: Seq[Expression] = Seq.empty, - dataFilters: Seq[Expression] = Seq.empty) extends FileScan with ScanWithMetrics { + dataFilters: Seq[Expression] = Seq.empty, + queryUsesInputFile: Boolean = false) extends FileScan with ScanWithMetrics { override def isSplitable(path: Path): Boolean = true - @scala.annotation.nowarn( - "msg=value ignoreExtension in class AvroOptions is deprecated*" - ) override def createReaderFactory(): PartitionReaderFactory = { val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. @@ -112,36 +116,55 @@ case class GpuAvroScan( val parsedOptions = new AvroOptions(caseSensitiveMap, hadoopConf) // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. - GpuAvroPartitionReaderFactory( - sparkSession.sessionState.conf, - broadcastedConf, - dataSchema, - readDataSchema, - readPartitionSchema, - rapidsConf, - parsedOptions.ignoreExtension, - metrics) + if (rapidsConf.isAvroPerFileReadEnabled) { + GpuAvroPartitionReaderFactory(sparkSession.sessionState.conf, rapidsConf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, parsedOptions, metrics) + } else { + val f = GpuAvroMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, + rapidsConf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, + parsedOptions, metrics, pushedFilters, queryUsesInputFile) + // Now only coalescing is supported, so need to check it can be used for the final choice. + if (f.canUseCoalesceFilesReader){ + f + } else { + // Fall back to PerFile reading + GpuAvroPartitionReaderFactory(sparkSession.sessionState.conf, rapidsConf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, parsedOptions, metrics) + } + } } // overrides nothing in 330 - def withFilters( - partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = + def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) + override def equals(obj: Any): Boolean = obj match { + case a: GpuAvroScan => + super.equals(a) && dataSchema == a.dataSchema && options == a.options && + equivalentFilters(pushedFilters, a.pushedFilters) && rapidsConf == a.rapidsConf && + queryUsesInputFile == a.queryUsesInputFile + case _ => false + } + + override def hashCode(): Int = getClass.hashCode() + + override def description(): String = { + super.description() + ", PushedFilters: " + seqToString(pushedFilters) + } } /** Avro partition reader factory to build columnar reader */ case class GpuAvroPartitionReaderFactory( - sqlConf: SQLConf, + @transient sqlConf: SQLConf, + @transient rapidsConf: RapidsConf, broadcastedConf: Broadcast[SerializableConfiguration], dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - @transient rapidsConf: RapidsConf, - ignoreExtension: Boolean, + options: AvroOptions, metrics: Map[String, GpuMetric]) extends FilePartitionReaderFactory with Logging { - private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix + private val debugDumpPrefix = Option(rapidsConf.avroDebugDumpPrefix) private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes @@ -151,92 +174,322 @@ case class GpuAvroPartitionReaderFactory( override def buildColumnarReader(partFile: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - val blockMeta = GpuAvroFileFilterHandler(sqlConf, broadcastedConf, - ignoreExtension, broadcastedConf.value.value).filterBlocks(partFile) - val reader = new PartitionReaderWithBytesRead(new AvroPartitionReader(conf, partFile, blockMeta, - readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, + val blockMeta = AvroFileFilterHandler(conf, options).filterBlocks(partFile) + val reader = new PartitionReaderWithBytesRead(new GpuAvroPartitionReader(conf, partFile, + blockMeta, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics)) ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema) } } /** - * A tool to filter Avro blocks - * - * @param sqlConf SQLConf - * @param broadcastedConf the Hadoop configuration + * The multi-file partition reader factory for cloud or coalescing reading of avro file format. */ -private case class GpuAvroFileFilterHandler( +case class GpuAvroMultiFilePartitionReaderFactory( @transient sqlConf: SQLConf, + @transient rapidsConf: RapidsConf, broadcastedConf: Broadcast[SerializableConfiguration], - ignoreExtension: Boolean, - hadoopConf: Configuration) extends Arm with Logging { + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + options: AvroOptions, + metrics: Map[String, GpuMetric], + filters: Array[Filter], + queryUsesInputFile: Boolean) + extends MultiFilePartitionReaderFactoryBase(sqlConf, broadcastedConf, rapidsConf) { - def filterBlocks(partFile: PartitionedFile): AvroBlockMeta = { + private val debugDumpPrefix = Option(rapidsConf.avroDebugDumpPrefix) + private val ignoreMissingFiles = sqlConf.ignoreMissingFiles + private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles + + private val numThreads = rapidsConf.avroMultiThreadReadNumThreads + private val maxNumFileProcessed = rapidsConf.maxNumAvroFilesParallel + + // we can't use the coalescing files reader when InputFileName, InputFileBlockStart, + // or InputFileBlockLength because we are combining all the files into a single buffer + // and we don't know which file is associated with each row. + override val canUseCoalesceFilesReader: Boolean = + rapidsConf.isAvroCoalesceFileReadEnabled && !(queryUsesInputFile || ignoreCorruptFiles) + + // disbale multi-threaded until it is supported. + override val canUseMultiThreadReader: Boolean = false + + /** + * File format short name used for logging and other things to uniquely identity + * which file format is being used. + */ + override final def getFileFormatShortName: String = "AVRO" + + /** + * Build the PartitionReader for cloud reading + */ + override def buildBaseColumnarReaderForCloud( + files: Array[PartitionedFile], + conf: Configuration): PartitionReader[ColumnarBatch] = { + throw new UnsupportedOperationException() + } - def passSync(blockStart: Long, position: Long): Boolean = { - blockStart >= position + SYNC_SIZE + /** + * Build the PartitionReader for coalescing reading + * + * @param files files to be read + * @param conf the configuration + * @return a PartitionReader of coalescing reading + */ + override def buildBaseColumnarReaderForCoalescing( + files: Array[PartitionedFile], + conf: Configuration): PartitionReader[ColumnarBatch] = { + val clippedBlocks = ArrayBuffer[AvroSingleDataBlockInfo]() + val mapPathHeader = LinkedHashMap[Path, Header]() + val filterHandler = AvroFileFilterHandler(conf, options) + files.foreach { file => + val singleFileInfo = try { + filterHandler.filterBlocks(file) + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: ${file.filePath}", e) + AvroBlockMeta(null, Seq.empty) + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @(_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e) + AvroBlockMeta(null, Seq.empty) + } + val fPath = new Path(new URI(file.filePath)) + clippedBlocks ++= singleFileInfo.blocks.map(block => + AvroSingleDataBlockInfo( + fPath, + AvroDataBlock(block), + file.partitionValues, + AvroSchemaWrapper(singleFileInfo.header.schema), + AvroExtraInfo())) + if (singleFileInfo.blocks.nonEmpty) { + // No need to check the header since it can not be null when blocks is not empty here. + mapPathHeader.put(fPath, singleFileInfo.header) + } } + new GpuMultiFileAvroPartitionReader(conf, files, clippedBlocks, readDataSchema, + partitionSchema, maxReadBatchSizeRows, maxReadBatchSizeBytes, numThreads, + debugDumpPrefix, metrics, mapPathHeader.toMap) + } - if (ignoreExtension || partFile.filePath.endsWith(".avro")) { - val in = new FsInput(new Path(new URI(partFile.filePath)), hadoopConf) - closeOnExcept(in) { _ => - withResource(AvroDataFileReader.openReader(in)) { reader => - val blocks = reader.getBlocks() - val filteredBlocks = new ArrayBuffer[BlockInfo]() - blocks.foreach(block => { - if (partFile.start <= block.blockStart - SYNC_SIZE && - !passSync(block.blockStart, partFile.start + partFile.length)) { - filteredBlocks.append(block) +} + +/** A trait collecting common methods across the 3 kinds of avro readers */ +trait GpuAvroReaderBase extends Arm with Logging { self: FilePartitionReaderBase => + private val avroFormat = Some("avro") + + def debugDumpPrefix: Option[String] + + def readDataSchema: StructType + + /** + * Read the host data to GPU for decoding, and return it as a cuDF Table. + * The input host buffer should contain valid data, otherwise the behavior is + * undefined. + * 'splits' is used only for debugging. + */ + protected final def sendToGpuUnchecked( + hostBuf: HostMemoryBuffer, + bufSize: Long, + splits: Array[PartitionedFile]): Table = { + // Dump buffer for debugging when required + dumpDataToFile(hostBuf, bufSize, splits, debugDumpPrefix, avroFormat) + + val readOpts = CudfAvroOptions.builder() + .includeColumn(readDataSchema.fieldNames.toSeq: _*) + .build() + // about to start using the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME)) + + withResource(new NvtxWithMetrics("Avro decode", + NvtxColor.DARK_GREEN, metrics(GPU_DECODE_TIME))) { _ => + Table.readAvro(readOpts, hostBuf, 0, bufSize) + } + } + + /** + * Send a host buffer to GPU for decoding, and return it as a ColumnarBatch. + * The input hostBuf will be closed after returning, please do not use it anymore. + * 'splits' is used only for debugging. + */ + protected final def sendToGpu( + hostBuf: HostMemoryBuffer, + bufSize: Long, + splits: Array[PartitionedFile]): Option[ColumnarBatch] = { + withResource(hostBuf) { _ => + if (bufSize == 0) { + None + } else { + withResource(sendToGpuUnchecked(hostBuf, bufSize, splits)) { t => + val batchSizeBytes = GpuColumnVector.getTotalDeviceMemoryUsed(t) + logDebug(s"GPU batch size: $batchSizeBytes bytes") + maxDeviceMemory = max(batchSizeBytes, maxDeviceMemory) + metrics(NUM_OUTPUT_BATCHES) += 1 + // convert to batch + Some(GpuColumnVector.from(t, GpuColumnVector.extractTypes(readDataSchema))) + } + } // end of else + } + } + + /** + * Get the block chunk according to the max batch size and max rows. + * + * @param blockIter blocks to be evaluated + * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader + * reads per batch + * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader + * reads per batch + * @return + */ + protected final def populateCurrentBlockChunk( + blockIter: BufferedIterator[BlockInfo], + maxReadBatchSizeRows: Int, + maxReadBatchSizeBytes: Long): Seq[BlockInfo] = { + + val currentChunk = new ArrayBuffer[BlockInfo] + var numRows, numBytes, numAvroBytes: Long = 0 + + @tailrec + def readNextBatch(): Unit = { + if (blockIter.hasNext) { + val peekedRowGroup = blockIter.head + if (peekedRowGroup.count > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Too many rows in split") + } + if (numRows == 0 || numRows + peekedRowGroup.count <= maxReadBatchSizeRows) { + val estBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, peekedRowGroup.count) + if (numBytes == 0 || numBytes + estBytes <= maxReadBatchSizeBytes) { + currentChunk += blockIter.next() + numRows += currentChunk.last.count + numAvroBytes += currentChunk.last.blockDataSize + numBytes += estBytes + readNextBatch() + } + } + } + } + + readNextBatch() + logDebug(s"Loaded $numRows rows from Avro. bytes read: $numAvroBytes. " + + s"Estimated GPU bytes: $numBytes") + currentChunk + } + + /** Read a split into a host buffer, preparing for sending to GPU */ + protected final def readPartFile( + partFilePath: Path, + blocks: Seq[BlockInfo], + header: Header, + conf: Configuration): (HostMemoryBuffer, Long) = { + withResource(new NvtxWithMetrics("Avro buffer file split", NvtxColor.YELLOW, + metrics("bufferTime"))) { _ => + if (blocks.isEmpty) { + // No need to check the header here since it can not be null when blocks is not empty. + return (null, 0L) + } + val estOutSize = estimateOutputSize(blocks, header) + withResource(partFilePath.getFileSystem(conf).open(partFilePath)) { in => + closeOnExcept(HostMemoryBuffer.allocate(estOutSize)) { hmb => + withResource(new HostMemoryOutputStream(hmb)) { out => + val headerAndBlocks = BlockInfo(0, header.firstBlockStart, 0, 0) +: blocks + copyBlocksData(headerAndBlocks, in, out) + // check we didn't go over memory + if (out.getPos > estOutSize) { + throw new QueryExecutionException(s"Calculated buffer size $estOutSize is" + + s" too small, actual written: ${out.getPos}") } - }) - AvroBlockMeta(reader.getHeader(), filteredBlocks) + (hmb, out.getPos) + } } } - } else { - AvroBlockMeta(new Header(), Seq.empty) } } -} -/** - * Avro block meta info - * - * @param header the header of avro file - * @param blocks the total block info of avro file - */ -case class AvroBlockMeta(header: Header, blocks: Seq[BlockInfo]) + /** Estimate the total size from the given blocks and header */ + protected final def estimateOutputSize(blocks: Seq[BlockInfo], header: Header): Long = { + // Start from the Header + var totalSize: Long = header.firstBlockStart + // Add all blocks + totalSize += blocks.map(_.blockLength).sum + totalSize + } -/** - * CopyRange to indicate from where to copy. - * - * @param offset from where to copy - * @param length how many bytes to copy - */ -case class CopyRange(offset: Long, length: Long) + /** Copy the data specified by the blocks from `in` to `out` */ + protected final def copyBlocksData( + blocks: Seq[BlockInfo], + in: FSDataInputStream, + out: OutputStream): Seq[BlockInfo] = { + val copyRanges = computeCopyRanges(blocks) + // copy cache: 8MB + val copyCache = new Array[Byte](8 * 1024 * 1024) + var readTime, writeTime = 0L + + copyRanges.foreach { range => + if (in.getPos != range.offset) { + in.seek(range.offset) + } + var bytesLeft = range.length + while (bytesLeft > 0) { + // downcast is safe because copyBuffer.length is an int + val readLength = Math.min(bytesLeft, copyCache.length).toInt + val start = System.nanoTime() + in.readFully(copyCache, 0, readLength) + val mid = System.nanoTime() + out.write(copyCache, 0, readLength) + val end = System.nanoTime() + readTime += (mid - start) + writeTime += (end - mid) + bytesLeft -= readLength + } + } + metrics.get(READ_FS_TIME).foreach(_.add(readTime)) + metrics.get(WRITE_BUFFER_TIME).foreach(_.add(writeTime)) + blocks + } -/** - * - * @param conf the Hadoop configuration - * @param partFile the partitioned files to read - * @param blockMeta the block meta info of partFile - * @param readDataSchema the Spark schema describing what will be read - * @param debugDumpPrefix a path prefix to use for dumping the fabricated avro data or null - * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader reads per batch - * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader reads per batch - * @param execMetrics metrics - */ -class AvroPartitionReader( + /** + * Calculate the copy ranges from blocks. + * And it will try to combine the sequential blocks. + */ + private def computeCopyRanges(blocks: Seq[BlockInfo]): Array[CopyRange] = { + var currentCopyStart, currentCopyEnd = 0L + val copyRanges = new ArrayBuffer[CopyRange]() + + blocks.foreach { block => + if (currentCopyEnd != block.blockStart) { + if (currentCopyEnd != 0) { + copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) + } + currentCopyStart = block.blockStart + currentCopyEnd = currentCopyStart + } + currentCopyEnd += block.blockLength + } + + if (currentCopyEnd != currentCopyStart) { + copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) + } + copyRanges.toArray + } + +} + +/** A PartitionReader that reads an AVRO file split on the GPU. */ +class GpuAvroPartitionReader( conf: Configuration, partFile: PartitionedFile, blockMeta: AvroBlockMeta, - readDataSchema: StructType, - debugDumpPrefix: String, + override val readDataSchema: StructType, + override val debugDumpPrefix: Option[String], maxReadBatchSizeRows: Integer, maxReadBatchSizeBytes: Long, - execMetrics: Map[String, GpuMetric]) extends FilePartitionReaderBase(conf, execMetrics) { + execMetrics: Map[String, GpuMetric]) + extends FilePartitionReaderBase(conf, execMetrics) with GpuAvroReaderBase { - val filePath = new Path(new URI(partFile.filePath)) + private val partFilePath = new Path(new URI(partFile.filePath)) private val blockIterator: BufferedIterator[BlockInfo] = blockMeta.blocks.iterator.buffered override def next(): Boolean = { @@ -273,200 +526,238 @@ class AvroPartitionReader( Some(new ColumnarBatch(Array.empty, numRows.toInt)) } } else { - val table = readToTable(currentChunkedBlocks) - try { - val colTypes = readDataSchema.fields.map(f => f.dataType) - val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes)) - maybeBatch.foreach { batch => - logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") - } - maybeBatch - } finally { - table.foreach(_.close()) + if (currentChunkedBlocks.isEmpty) { + None + } else { + val (dataBuffer, dataSize) = readPartFile(partFilePath, currentChunkedBlocks, + blockMeta.header, conf) + sendToGpu(dataBuffer, dataSize, Array(partFile)) } } } } - private def readToTable(currentChunkedBlocks: Seq[BlockInfo]): Option[Table] = { - if (currentChunkedBlocks.isEmpty) { - return None +} + +/** + * A PartitionReader that can read multiple AVRO files up to the certain size. It will + * coalesce small files together and copy the block data in a separate thread pool to speed + * up processing the small files before sending down to the GPU. + */ +class GpuMultiFileAvroPartitionReader( + conf: Configuration, + splits: Array[PartitionedFile], + clippedBlocks: Seq[AvroSingleDataBlockInfo], + override val readDataSchema: StructType, + partitionSchema: StructType, + maxReadBatchSizeRows: Integer, + maxReadBatchSizeBytes: Long, + numThreads: Int, + override val debugDumpPrefix: Option[String], + execMetrics: Map[String, GpuMetric], + mapPathHeader: Map[Path, Header]) + extends MultiFileCoalescingPartitionReaderBase(conf, clippedBlocks, readDataSchema, + partitionSchema, maxReadBatchSizeRows, maxReadBatchSizeBytes, numThreads, + execMetrics) with GpuAvroReaderBase { + + override def checkIfNeedToSplitDataBlock( + currentBlockInfo: SingleDataBlockInfo, + nextBlockInfo: SingleDataBlockInfo): Boolean = { + val nextHeader = mapPathHeader.get(nextBlockInfo.filePath).get + val curHeader = mapPathHeader.get(currentBlockInfo.filePath).get + // Split into another block when + // 1) the sync markers are different, or + // 2) a key exists in both of the two headers' metadata, and maps to different values. + //if (!Header.hasSameSync(nextHeader, curHeader)) { + // logInfo(s"Avro sync marker in the next file ${nextBlockInfo.filePath}" + + // s" differs from the current one in file ${currentBlockInfo.filePath}," + + // s" splitting it into a new batch!") + // return true + //} + if (Header.hasConflictInMetadata(nextHeader, curHeader)) { + logInfo(s"Avro metadata in the next file ${nextBlockInfo.filePath}" + + s" conflicts with the current one in file ${currentBlockInfo.filePath}," + + s" splitting it into a new batch!") + return true } - val (dataBuffer, dataSize) = readPartFile(currentChunkedBlocks, filePath) - try { - if (dataSize == 0) { - None - } else { - // Dump data into a file - dumpDataToFile(dataBuffer, dataSize, Array(partFile), Option(debugDumpPrefix), Some("avro")) + false + } + + override def calculateEstimatedBlocksOutputSize( + blocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]], + schema: SchemaBase): Long = { + // Get headers according to the input paths. + val headers = blocks.keys.map(mapPathHeader.get(_).get) + // Merge the meta, it is safe because the compatibility has been verifed + // in 'checkIfNeedToSplitDataBlock' + Header.mergeMetadata(headers.toSeq).map { mergedHeader => + val allBlocks = blocks.values.flatten.toSeq + estimateOutputSize(allBlocks, mergedHeader) + } getOrElse 0L + } + + override def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long = { + // Get headers according to the input paths. + val headers = paths.map(mapPathHeader.get(_).get) + // Merge the meta, it is safe because the compatibility has been verifed + // in 'checkIfNeedToSplitDataBlock' + Header.mergeMetadata(headers).map { mergedHeader => + withResource(new HostMemoryOutputStream(buffer)) { out => + AvroFileWriter(out).writeHeader(mergedHeader) + out.getPos + } + } getOrElse 0L + } - val includeColumns = readDataSchema.fieldNames.toSeq + override def calculateFinalBlocksOutputSize(footerOffset: Long, + blocks: Seq[DataBlockBase], schema: SchemaBase): Long = { + // In 'calculateEstimatedBlocksOutputSize', we have got the true size for + // Header + All Blocks. + footerOffset + } - val parseOpts = CudfAvroOptions.builder() - .includeColumn(includeColumns: _*).build() + override def writeFileFooter(buffer: HostMemoryBuffer, bufferSize: Long, footerOffset: Long, + blocks: Seq[DataBlockBase], clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = { + // AVRO files have no footer, do nothing + (buffer, bufferSize) + } - // about to start using the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME)) + override def readBufferToTable(dataBuffer: HostMemoryBuffer, dataSize: Long, + clippedSchema: SchemaBase, extraInfo: ExtraInfo): Table = { + sendToGpuUnchecked(dataBuffer, dataSize, splits) + } - val table = withResource(new NvtxWithMetrics("Avro decode", NvtxColor.DARK_GREEN, - metrics(GPU_DECODE_TIME))) { _ => - Table.readAvro(parseOpts, dataBuffer, 0, dataSize) - } - closeOnExcept(table) { _ => - maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - if (readDataSchema.length < table.getNumberOfColumns) { - throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " + - s"but read ${table.getNumberOfColumns} from $filePath") + override def getThreadPool(numThreads: Int): ThreadPoolExecutor = + AvroMultiFileThreadPool.getOrCreateThreadPool(getFileFormatShortName, numThreads) + + override final def getFileFormatShortName: String = "AVRO" + + override def getBatchRunner( + tc: TaskContext, + file: Path, + outhmb: HostMemoryBuffer, + blocks: ArrayBuffer[DataBlockBase], + offset: Long): Callable[(Seq[DataBlockBase], Long)] = + new AvroCopyBlocksRunner(tc, file, outhmb, blocks, offset) + + // The runner to copy blocks to offset of HostMemoryBuffer + class AvroCopyBlocksRunner( + taskContext: TaskContext, + file: Path, + outhmb: HostMemoryBuffer, + blocks: ArrayBuffer[DataBlockBase], + offset: Long) + extends Callable[(Seq[DataBlockBase], Long)] { + + override def call(): (Seq[DataBlockBase], Long) = { + TrampolineUtil.setTaskContext(taskContext) + try { + val startBytesRead = fileSystemBytesRead() + val res = withResource(outhmb) { _ => + withResource(file.getFileSystem(conf).open(file)) { in => + withResource(new HostMemoryOutputStream(outhmb)) { out => + copyBlocksData(blocks, in, out) + } } } - metrics(NUM_OUTPUT_BATCHES) += 1 - Some(table) + val bytesRead = fileSystemBytesRead() - startBytesRead + (res, bytesRead) + } finally { + TrampolineUtil.unsetTaskContext() } - } finally { - dataBuffer.close() } } - /** Copy the data into HMB */ - protected def copyDataRange( - range: CopyRange, - in: FSDataInputStream, - out: OutputStream, - copyBuffer: Array[Byte]): Unit = { - var readTime = 0L - var writeTime = 0L - if (in.getPos != range.offset) { - in.seek(range.offset) - } - var bytesLeft = range.length - while (bytesLeft > 0) { - // downcast is safe because copyBuffer.length is an int - val readLength = Math.min(bytesLeft, copyBuffer.length).toInt - val start = System.nanoTime() - in.readFully(copyBuffer, 0, readLength) - val mid = System.nanoTime() - out.write(copyBuffer, 0, readLength) - val end = System.nanoTime() - readTime += (mid - start) - writeTime += (end - mid) - bytesLeft -= readLength - } - execMetrics.get(READ_FS_TIME).foreach(_.add(readTime)) - execMetrics.get(WRITE_BUFFER_TIME).foreach(_.add(writeTime)) - } + // Some implicits for conversions between the base class to the sub-class + implicit def toAvroSchema(schema: SchemaBase): Schema = + schema.asInstanceOf[AvroSchemaWrapper].schema - /** - * Tried to combine the sequential blocks - * @param blocks blocks to be combined - * @param blocksRange the list of combined ranges - */ - private def combineBlocks(blocks: Seq[BlockInfo], - blocksRange: ArrayBuffer[CopyRange]) = { - var currentCopyStart = 0L - var currentCopyEnd = 0L + implicit def toBlockInfo(block: DataBlockBase): BlockInfo = + block.asInstanceOf[AvroDataBlock].blockInfo - // Combine the meta and blocks into a seq to get the copy range - val metaAndBlocks: Seq[BlockInfo] = - Seq(BlockInfo(0, blockMeta.header.getFirstBlockStart, 0, 0)) ++ blocks + implicit def toBlockInfos(blocks: Seq[DataBlockBase]): Seq[BlockInfo] = + blocks.map(toBlockInfo(_)) - metaAndBlocks.foreach { block => - if (currentCopyEnd != block.blockStart) { - if (currentCopyEnd != 0) { - blocksRange.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) - } - currentCopyStart = block.blockStart - currentCopyEnd = currentCopyStart - } - currentCopyEnd += block.blockLength - } + implicit def toBlockBases(blocks: Seq[BlockInfo]): Seq[DataBlockBase] = + blocks.map(AvroDataBlock(_)) - if (currentCopyEnd != currentCopyStart) { - blocksRange.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart)) - } + implicit def toAvroExtraInfo(in: ExtraInfo): AvroExtraInfo = + in.asInstanceOf[AvroExtraInfo] + +} + +/** Singleton threadpool that is used across all the tasks. */ +object AvroMultiFileThreadPool extends MultiFileReaderThreadPool + +/** A tool to filter Avro blocks */ +case class AvroFileFilterHandler( + hadoopConf: Configuration, + @transient options: AvroOptions) extends Arm with Logging { + + @scala.annotation.nowarn( + "msg=value ignoreExtension in class AvroOptions is deprecated*" + ) + val ignoreExtension = options.ignoreExtension + + private def passSync(blockStart: Long, position: Long): Boolean = { + blockStart >= position + SYNC_SIZE } - protected def readPartFile( - blocks: Seq[BlockInfo], - filePath: Path): (HostMemoryBuffer, Long) = { - withResource(new NvtxWithMetrics("Avro buffer file split", NvtxColor.YELLOW, - metrics("bufferTime"))) { _ => - withResource(filePath.getFileSystem(conf).open(filePath)) { in => - val estTotalSize = calculateOutputSize(blocks) - closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb => - val out = new HostMemoryOutputStream(hmb) - val copyRanges = new ArrayBuffer[CopyRange]() - combineBlocks(blocks, copyRanges) - val copyBuffer = new Array[Byte](8 * 1024 * 1024) - copyRanges.foreach(copyRange => copyDataRange(copyRange, in, out, copyBuffer)) - // check we didn't go over memory - if (out.getPos > estTotalSize) { - throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " + - s"small, actual written: ${out.getPos}") - } - (hmb, out.getPos) + def filterBlocks(partFile: PartitionedFile): AvroBlockMeta = { + if (ignoreExtension || partFile.filePath.endsWith(".avro")) { + val in = new FsInput(new Path(new URI(partFile.filePath)), hadoopConf) + closeOnExcept(in) { _ => + withResource(AvroDataFileReader.openReader(in)) { reader => + val blocks = reader.getBlocks() + val filteredBlocks = new ArrayBuffer[BlockInfo]() + blocks.foreach(block => { + if (partFile.start <= block.blockStart - SYNC_SIZE && + !passSync(block.blockStart, partFile.start + partFile.length)) { + filteredBlocks.append(block) + } + }) + AvroBlockMeta(reader.getHeader(), filteredBlocks) } } + } else { + AvroBlockMeta(null, Seq.empty) } } +} - /** - * Calculate the combined size - * @param currentChunkedBlocks the blocks to calculated - * @return the total size of blocks + header - */ - protected def calculateOutputSize(currentChunkedBlocks: Seq[BlockInfo]): Long = { - var totalSize: Long = 0; - // For simplicity, we just copy the whole meta of AVRO - totalSize += blockMeta.header.getFirstBlockStart - // Add all blocks - totalSize += currentChunkedBlocks.map(_.blockLength).sum - totalSize - } +/** + * Avro block meta info + * + * @param header the header of avro file + * @param blocks the total block info of avro file + */ +case class AvroBlockMeta(header: Header, blocks: Seq[BlockInfo]) - /** - * Get the block chunk according to the max batch size and max rows. - * - * @param blockIter blocks to be evaluated - * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader - * reads per batch - * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader - * reads per batch - * @return - */ - protected def populateCurrentBlockChunk( - blockIter: BufferedIterator[BlockInfo], - maxReadBatchSizeRows: Int, - maxReadBatchSizeBytes: Long): Seq[BlockInfo] = { - val currentChunk = new ArrayBuffer[BlockInfo] - var numRows: Long = 0 - var numBytes: Long = 0 - var numAvroBytes: Long = 0 +/** + * CopyRange to indicate from where to copy. + * + * @param offset from where to copy + * @param length how many bytes to copy + */ +private case class CopyRange(offset: Long, length: Long) - @tailrec - def readNextBatch(): Unit = { - if (blockIter.hasNext) { - val peekedRowGroup = blockIter.head - if (peekedRowGroup.count > Integer.MAX_VALUE) { - throw new UnsupportedOperationException("Too many rows in split") - } - if (numRows == 0 || numRows + peekedRowGroup.count <= maxReadBatchSizeRows) { - val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, - peekedRowGroup.count) - if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { - currentChunk += blockIter.next() - numRows += currentChunk.last.count - numAvroBytes += currentChunk.last.count - numBytes += estimatedBytes - readNextBatch() - } - } - } - } +/** Extra information: codec */ +case class AvroExtraInfo() extends ExtraInfo - readNextBatch() - logDebug(s"Loaded $numRows rows from Avro. bytes read: $numAvroBytes. " + - s"Estimated GPU bytes: $numBytes") - currentChunk - } +/** avro schema wrapper */ +case class AvroSchemaWrapper(schema: Schema) extends SchemaBase + +/** avro BlockInfo wrapper */ +case class AvroDataBlock(blockInfo: BlockInfo) extends DataBlockBase { + override def getRowCount: Long = blockInfo.count + override def getReadDataSize: Long = blockInfo.blockDataSize + override def getBlockSize: Long = blockInfo.blockLength } + +case class AvroSingleDataBlockInfo( + filePath: Path, + dataBlock: AvroDataBlock, + partitionValues: InternalRow, + schema: AvroSchemaWrapper, + extraInfo: AvroExtraInfo) extends SingleDataBlockInfo diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index d37a6608f2b..b638749e93b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -79,6 +79,8 @@ case class GpuFileSourceScanExec( private val isPerFileReadEnabled = relation.fileFormat match { case _: ParquetFileFormat => rapidsConf.isParquetPerFileReadEnabled case _: OrcFileFormat => rapidsConf.isOrcPerFileReadEnabled + case ef if ExternalSource.isSupportedFormat(ef) => + ExternalSource.isPerFileReadEnabledForFormat(ef, rapidsConf) case _ => true // For others, default to PERFILE reader } @@ -579,9 +581,14 @@ case class GpuFileSourceScanExec( rapidsConf, allMetrics, queryUsesInputFile) - case _ => - // never reach here - throw new RuntimeException(s"File format ${relation.fileFormat} is not supported yet") + case ef if ExternalSource.isSupportedFormat(ef) => + ExternalSource.createMultiFileReaderFactory( + ef, + broadcastedHadoopConf, + pushedDownFilters.toArray, + this) + case other => + throw new IllegalArgumentException(s"${other.getClass.getCanonicalName} is not supported") } } @@ -615,7 +622,10 @@ object GpuFileSourceScanExec { case f if GpuOrcFileFormat.isSparkOrcFormat(f) => GpuReadOrcFileFormat.tagSupport(meta) case _: ParquetFileFormat => GpuReadParquetFileFormat.tagSupport(meta) case _: JsonFileFormat => GpuReadJsonFileFormat.tagSupport(meta) - case _ => ExternalSource.tagSupportForGpuFileSourceScanExec(meta) + case ef if ExternalSource.isSupportedFormat(ef) => + ExternalSource.tagSupportForGpuFileSourceScan(meta) + case other => + meta.willNotWorkOnGpu(s"unsupported file format: ${other.getClass.getCanonicalName}") } } @@ -625,7 +635,10 @@ object GpuFileSourceScanExec { case f if GpuOrcFileFormat.isSparkOrcFormat(f) => new GpuReadOrcFileFormat case _: ParquetFileFormat => new GpuReadParquetFileFormat case _: JsonFileFormat => new GpuReadJsonFileFormat - case _ => ExternalSource.convertFileFormatForGpuFileSourceScanExec(format) + case ef if ExternalSource.isSupportedFormat(ef) => ExternalSource.getReadFileFormat(ef) + case other => + throw new IllegalArgumentException(s"${other.getClass.getCanonicalName} is not supported") + } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala index 28f67cde860..825e3bc36bd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuReadAvroFileFormat.scala @@ -50,17 +50,14 @@ class GpuReadAvroFileFormat extends AvroFileFormat with GpuReadFileFormatWithMet val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions = new AvroOptions(options, hadoopConf) - val ignoreExtension = parsedOptions.ignoreExtension - val factory = GpuAvroPartitionReaderFactory( sqlConf, + new RapidsConf(sqlConf), broadcastedHadoopConf, dataSchema, requiredSchema, partitionSchema, - new RapidsConf(sqlConf), - ignoreExtension, + new AvroOptions(options, hadoopConf), metrics) PartitionReaderIterator.buildReader(factory) } diff --git a/tests/pom.xml b/tests/pom.xml index 630f35704f3..975ba3435a6 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -82,6 +82,12 @@ spark-sql_${scala.binary.version} ${spark.test.version} + + org.apache.spark + spark-avro_${scala.binary.version} + ${spark.version} + provided + @@ -181,6 +187,24 @@ ${spark.version} provided + + org.apache.spark + spark-avro_${scala.binary.version} + ${spark.version} + provided + + + org.apache.avro + avro-mapred + ${spark.version} + provided + + + org.apache.avro + avro + ${spark.version} + provided + org.apache.commons commons-io diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderSuites.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala similarity index 51% rename from tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderSuites.scala rename to tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala index 9aea9868a24..b48366d6fe7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderSuites.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuReaderTypeSuites.scala @@ -16,43 +16,51 @@ package com.nvidia.spark.rapids +import com.nvidia.spark.rapids.RapidsReaderType._ import com.nvidia.spark.rapids.shims.GpuBatchScanExec import org.apache.spark.SparkConf import org.apache.spark.sql.FileUtils.withTempPath import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.functions.input_file_name -import org.apache.spark.sql.rapids.GpuFileSourceScanExec +import org.apache.spark.sql.rapids.{ExternalSource, GpuFileSourceScanExec} -trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { +trait ReaderTypeSuite extends SparkQueryCompareTestSuite with Arm { /** File format */ protected def format: String - /** Check if use multithreaded reading */ - private def checkMultiThreadedReading( + protected def otherConfs: Iterable[(String, String)] = Seq.empty + + private def checkReaderType( readerFactory: PartitionReaderFactory, inputFile: Array[String], - expected: Boolean) = { - readerFactory match { - case factory: MultiFilePartitionReaderFactoryBase => - assert(factory.useMultiThread(inputFile) == expected) - case _ => assert(false, "PERFILE is Use") + expectedReaderType: RapidsReaderType) = { + val actualReaderType = readerFactory match { + case mf: MultiFilePartitionReaderFactoryBase => + if (mf.useMultiThread(inputFile)) { + MULTITHREADED + } else { + COALESCING + } + case _ => PERFILE } + assert(expectedReaderType == actualReaderType, + s", expected $expectedReaderType, but got $actualReaderType") } /** - * Test if multithreaded reading is Use. + * Test if the given reader type will be used. * * @param conf SparkConf * @param files input files * @param multithreadedReadingExpected true: multithreaded reading, false: coalescing reading. * @param hasInputExpression if has input expression */ - private def testMultiThreadedReader( + protected final def testReaderType( conf: SparkConf, files: Array[String], - multithreadedReadingExpected: Boolean, + expectedReaderType: RapidsReaderType, hasInputExpression: Boolean = false): Unit = { withTempPath { file => withCpuSparkSession(spark => { @@ -65,23 +73,26 @@ trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { val df = if (hasInputExpression) rawDf.withColumn("input", input_file_name()) else rawDf val plans = df.queryExecution.executedPlan.collect { case plan: GpuBatchScanExec => - checkMultiThreadedReading(plan.readerFactory, files, multithreadedReadingExpected) + checkReaderType(plan.readerFactory, files, expectedReaderType) plan case plan: GpuFileSourceScanExec => - checkMultiThreadedReading(plan.readerFactory, files, multithreadedReadingExpected) + checkReaderType(plan.readerFactory, files, expectedReaderType) plan } assert(!plans.isEmpty, "File reader is not running on GPU") - }, conf) + }, conf.setAll(otherConfs)) } } +} + +trait MultiReaderTypeSuite extends ReaderTypeSuite { test("Use coalescing reading for local files") { val testFile = Array("/tmp/xyz") Seq(format, "").foreach(useV1Source => { val conf = new SparkConf() .set("spark.sql.sources.useV1SourceList", useV1Source) - testMultiThreadedReader(conf, testFile, false) + testReaderType(conf, testFile, COALESCING) }) } @@ -90,7 +101,7 @@ trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { Seq(format, "").foreach(useV1Source => { val conf = new SparkConf() .set("spark.sql.sources.useV1SourceList", useV1Source) - testMultiThreadedReader(conf, testFile, true) + testReaderType(conf, testFile, MULTITHREADED) }) } @@ -100,7 +111,7 @@ trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { val conf = new SparkConf() .set("spark.sql.sources.useV1SourceList", useV1Source) .set(s"spark.rapids.sql.format.${format}.reader.type", "COALESCING") - testMultiThreadedReader(conf, testFile, false) + testReaderType(conf, testFile, COALESCING) }) } @@ -110,7 +121,7 @@ trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { val conf = new SparkConf() .set("spark.sql.sources.useV1SourceList", useV1Source) .set(s"spark.rapids.sql.format.${format}.reader.type", "MULTITHREADED") - testMultiThreadedReader(conf, testFile, true) + testReaderType(conf, testFile, MULTITHREADED) }) } @@ -120,7 +131,7 @@ trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { val conf = new SparkConf() .set("spark.sql.sources.useV1SourceList", useV1Source) .set(s"spark.rapids.sql.format.${format}.reader.type", "COALESCING") - testMultiThreadedReader(conf, testFile, true, true) + testReaderType(conf, testFile, MULTITHREADED, hasInputExpression=true) }) } @@ -131,15 +142,78 @@ trait FileSourceSuite extends SparkQueryCompareTestSuite with Arm { .set("spark.sql.sources.useV1SourceList", useV1Source) .set("spark.sql.files.ignoreCorruptFiles", "true") .set(s"spark.rapids.sql.format.${format}.reader.type", "COALESCING") - testMultiThreadedReader(conf, testFile, true ) + testReaderType(conf, testFile, MULTITHREADED) }) } } -class GpuParquetReaderSuites extends FileSourceSuite { +class GpuParquetReaderTypeSuites extends MultiReaderTypeSuite { override protected def format: String = "parquet" } -class GpuOrcReaderSuites extends FileSourceSuite { +class GpuOrcReaderTypeSuites extends MultiReaderTypeSuite { override protected def format: String = "orc" } + +class GpuAvroReaderTypeSuites extends ReaderTypeSuite { + override lazy val format: String = "avro" + override lazy val otherConfs: Iterable[(String, String)] = Seq( + ("spark.rapids.sql.format.avro.read.enabled", "true"), + ("spark.rapids.sql.format.avro.enabled", "true")) + + private lazy val hasAvroJar = ExternalSource.hasSparkAvroJar + + test("Use coalescing reading for local files") { + assume(hasAvroJar) + val testFile = Array("/tmp/xyz") + Seq(format, "").foreach(useV1Source => { + val conf = new SparkConf() + .set("spark.sql.sources.useV1SourceList", useV1Source) + testReaderType(conf, testFile, COALESCING) + }) + } + + test("Use coalescing reading for cloud files if coalescing can work") { + assume(hasAvroJar) + val testFile = Array("s3:/tmp/xyz") + Seq(format, "").foreach(useV1Source => { + val conf = new SparkConf() + .set("spark.sql.sources.useV1SourceList", useV1Source) + testReaderType(conf, testFile, COALESCING) + }) + } + + test("Force coalescing reading for cloud files when setting COALESCING ") { + assume(hasAvroJar) + val testFile = Array("s3:/tmp/xyz") + Seq(format, "").foreach(useV1Source => { + val conf = new SparkConf() + .set("spark.sql.sources.useV1SourceList", useV1Source) + .set(s"spark.rapids.sql.format.${format}.reader.type", "COALESCING") + testReaderType(conf, testFile, COALESCING) + }) + } + + test("Use per-file reading for input expression even setting COALESCING") { + assume(hasAvroJar) + val testFile = Array("/tmp/xyz") + Seq(format, "").foreach(useV1Source => { + val conf = new SparkConf() + .set("spark.sql.sources.useV1SourceList", useV1Source) + .set(s"spark.rapids.sql.format.${format}.reader.type", "COALESCING") + testReaderType(conf, testFile, PERFILE, hasInputExpression=true) + }) + } + + test("Use per-file reading for ignoreCorruptFiles even setting COALESCING") { + assume(hasAvroJar) + val testFile = Array("/tmp/xyz") + Seq(format, "").foreach(useV1Source => { + val conf = new SparkConf() + .set("spark.sql.sources.useV1SourceList", useV1Source) + .set("spark.sql.files.ignoreCorruptFiles", "true") + .set(s"spark.rapids.sql.format.${format}.reader.type", "COALESCING") + testReaderType(conf, testFile, PERFILE) + }) + } +} From 9009e5e451c034cf1e869b09022fb32757d61521 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 25 Apr 2022 20:03:31 +0800 Subject: [PATCH 2/8] Remove unused code Signed-off-by: Firestarman --- .../org/apache/spark/sql/rapids/GpuAvroScan.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index ff4b15b0a75..b6724bbbfe5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -565,15 +565,8 @@ class GpuMultiFileAvroPartitionReader( nextBlockInfo: SingleDataBlockInfo): Boolean = { val nextHeader = mapPathHeader.get(nextBlockInfo.filePath).get val curHeader = mapPathHeader.get(currentBlockInfo.filePath).get - // Split into another block when - // 1) the sync markers are different, or - // 2) a key exists in both of the two headers' metadata, and maps to different values. - //if (!Header.hasSameSync(nextHeader, curHeader)) { - // logInfo(s"Avro sync marker in the next file ${nextBlockInfo.filePath}" + - // s" differs from the current one in file ${currentBlockInfo.filePath}," + - // s" splitting it into a new batch!") - // return true - //} + // Split into another block when a key exists in both of the two headers' metadata, + // and maps to different values. if (Header.hasConflictInMetadata(nextHeader, curHeader)) { logInfo(s"Avro metadata in the next file ${nextBlockInfo.filePath}" + s" conflicts with the current one in file ${currentBlockInfo.filePath}," + From f1554df93eb91726add91317d2343d00b078cc22 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 29 Apr 2022 12:09:18 +0800 Subject: [PATCH 3/8] address the comments Signed-off-by: Firestarman --- .../src/main/python/avro_test.py | 42 ++++----- pom.xml | 6 ++ sql-plugin/pom.xml | 2 - .../nvidia/spark/rapids/AvroDataReader.scala | 94 ++++++++++--------- .../spark/rapids/GpuMultiFileReader.scala | 2 +- .../apache/spark/sql/rapids/GpuAvroScan.scala | 28 +++--- tests/pom.xml | 8 +- 7 files changed, 91 insertions(+), 91 deletions(-) diff --git a/integration_tests/src/main/python/avro_test.py b/integration_tests/src/main/python/avro_test.py index 43603563bf3..80628c50560 100644 --- a/integration_tests/src/main/python/avro_test.py +++ b/integration_tests/src/main/python/avro_test.py @@ -32,15 +32,22 @@ rapids_reader_types = ['PERFILE', 'COALESCING'] +# 50 files for the coalescing reading case +coalescingPartitionNum = 50 + +def gen_avro_files(gen_list, out_path): + with_cpu_session( + lambda spark: gen_df(spark, + gen_list).repartition(coalescingPartitionNum).write.format("avro").save(out_path) + ) + + @pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"]) @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)] data_path = spark_tmp_path + '/AVRO_DATA' - # 50 files for the coalescing reading case - with_cpu_session( - lambda spark: gen_df(spark, gen_list).repartition(50).write.format("avro").save(data_path) - ) + gen_avro_files(gen_list, data_path) all_confs = copy_and_update(_enable_all_types_conf, { 'spark.rapids.sql.format.avro.reader.type': reader_type, @@ -54,20 +61,11 @@ def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type): @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_type): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)] - first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20' - with_cpu_session( - lambda spark: gen_df(spark, - gen_list).repartition(50).write.format("avro").save(first_data_path)) - second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21' - with_cpu_session( - lambda spark: gen_df(spark, - gen_list).repartition(50).write.format("avro").save(second_data_path)) - third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22' - with_cpu_session( - lambda spark: gen_df(spark, - gen_list).repartition(50).write.format("avro").save(third_data_path)) - data_path = spark_tmp_path + '/AVRO_DATA' + # generate partitioned files + for v in [0, 1, 2]: + out_path = data_path + '/key={}/key2=2{}'.format(v, v) + gen_avro_files(gen_list, out_path) all_confs = copy_and_update(_enable_all_types_conf, { 'spark.rapids.sql.format.avro.reader.type': reader_type, @@ -80,13 +78,11 @@ def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_ty @pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"]) @pytest.mark.parametrize('reader_type', rapids_reader_types) def test_avro_input_meta(spark_tmp_path, v1_enabled_list, reader_type): - first_data_path = spark_tmp_path + '/AVRO_DATA/key=0' - with_cpu_session( - lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path)) - second_data_path = spark_tmp_path + '/AVRO_DATA/key=1' - with_cpu_session( - lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(second_data_path)) data_path = spark_tmp_path + '/AVRO_DATA' + for v in [0, 1]: + out_path = data_path + '/key={}'.format(v) + with_cpu_session( + lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(out_path)) all_confs = copy_and_update(_enable_all_types_conf, { 'spark.rapids.sql.format.avro.reader.type': reader_type, diff --git a/pom.xml b/pom.xml index f1ae797b8a9..a4c031fffa2 100644 --- a/pom.xml +++ b/pom.xml @@ -911,6 +911,12 @@ ${mockito.version} test + + org.apache.spark + spark-avro_${scala.binary.version} + ${spark.version} + provided + diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index c010997d3a7..026c4260b7b 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -59,8 +59,6 @@ org.apache.spark spark-avro_${scala.binary.version} - ${spark.version} - provided diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala index 0672076396b..3f3b3105a8c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala @@ -22,7 +22,8 @@ import java.nio.charset.StandardCharsets import scala.collection.mutable import org.apache.avro.Schema -import org.apache.avro.file.{DataFileConstants, SeekableInput} +import org.apache.avro.file.DataFileConstants._ +import org.apache.avro.file.SeekableInput import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory} import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} @@ -61,22 +62,17 @@ private class SeekableInputStream(in: SeekableInput) extends InputStream with Se /** * The header information of an Avro file. */ -class Header private[rapids] { - private[rapids] val meta = mutable.Map[String, Array[Byte]]() - private[rapids] val sync = new Array[Byte](DataFileConstants.SYNC_SIZE) - private[rapids] var headerSize: Option[Long] = None +case class Header( + meta: Map[String, Array[Byte]], + // Array in scala is mutable, so keep it private to avoid unexpected update. + private val syncBuffer: Array[Byte]) { - def firstBlockStart: Long = headerSize.getOrElse { - val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM) - AvroFileWriter(out).writeHeader(this) - val newSize = out.getByteCount - headerSize = Some(newSize) - newSize - } + /** Get a copy of the sync marker. */ + def sync: Array[Byte] = syncBuffer.clone @transient lazy val schema: Schema = { - getMetaString(DataFileConstants.SCHEMA) + getMetaString(SCHEMA) .map(s => new Schema.Parser().setValidateDefaults(false).setValidate(false).parse(s)) .orNull } @@ -87,39 +83,38 @@ class Header private[rapids] { } object Header { + /** Compute header size in bytes for serialization */ + def headerSizeInBytes(h: Header): Long = { + val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM) + AvroFileWriter(out).writeHeader(h) + out.getByteCount + } + /** * Merge the metadata of the given headers. * Note: It does not check the compatibility of the headers. * @param headers whose metadata to be merged. - * @return the first header but having the new merged metadata, or - * None if the input is empty. + * @return a header with the new merged metadata and the first header's + * sync marker, or None if the input is empty. */ def mergeMetadata(headers: Seq[Header]): Option[Header] = { if (headers.isEmpty) { None - } else if (headers.size == 1) { - Some(headers.head) } else { - val mergedHeader = headers.reduce { (merged, h) => - merged.meta ++= h.meta - merged + val mergedMeta = headers.map(_.meta).reduce { (merged, meta) => + merged ++ meta } - // need to re-compute the header size - mergedHeader.headerSize = None - Some(mergedHeader) + Some(Header(mergedMeta, headers.head.sync)) } } - /** Test whether the two headers have the same sync marker */ - def hasSameSync(h1: Header, h2: Header): Boolean = h1.sync.sameElements(h2.sync) - /** * Test whether the two headers have conflicts in the metadata. * A conflict means a key exists in both of the two headers' metadata, * and maps to different values. */ def hasConflictInMetadata(h1: Header, h2: Header): Boolean = h1.meta.exists { - case (k, v) => h2.meta.contains(k) && !h2.meta.get(k).get.sameElements(v) + case (k, v) => h2.meta.get(k).exists(_.sameElements(v)) } } @@ -140,29 +135,36 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { private val sin = new SeekableInputStream(si) sin.seek(0) // seek to the start of file and get some meta info. private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin); - private val header: Header = new Header() + private var header: Header = null private var firstBlockStart: Long = 0 // store all blocks info - private val blocks: mutable.ArrayBuffer[BlockInfo] = mutable.ArrayBuffer.empty + private var blocks: Option[Seq[BlockInfo]] = None initialize() - def getBlocks(): Seq[BlockInfo] = blocks.toSeq - def getHeader(): Header = header - private def initialize() = { - val magic = new Array[Byte](DataFileConstants.MAGIC.length) - vin.readFixed(magic) + def getHeaderSize(): Long = firstBlockStart + + def getBlocks(): Seq[BlockInfo] = blocks.getOrElse { + val b = parseBlocks() + blocks = Some(b) + b + } + private def initialize(): Unit = { + // read magic + val magic = new Array[Byte](MAGIC.length) + vin.readFixed(magic) magic match { case Array(79, 98, 106, 1) => // current avro format case Array(79, 98, 106, 0) => // old format throw new UnsupportedOperationException("avro 1.2 format is not support by GPU") case _ => throw new RuntimeException("Not an Avro data file.") } - + // read metadata map + val meta = mutable.Map[String, Array[Byte]]() var l = vin.readMapStart().toInt if (l > 0) { do { @@ -171,15 +173,16 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { val value = vin.readBytes(null) val bb = new Array[Byte](value.remaining()) value.get(bb) - header.meta += (key -> bb) + meta += (key -> bb) } l = vin.mapNext().toInt } while (l != 0) } - vin.readFixed(header.sync) - firstBlockStart = sin.tell - vin.inputStream.available // get the first block Start address - header.headerSize = Some(firstBlockStart) - parseBlocks() + // read sync marker + val sync = new Array[Byte](SYNC_SIZE) + vin.readFixed(sync) + header = Header(meta.toMap, sync) + firstBlockStart = sin.tell - vin.inputStream.available } private def seek(position: Long): Unit = { @@ -187,18 +190,19 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { vin = DecoderFactory.get().binaryDecoder(this.sin, vin); } - private def parseBlocks(): Unit = { + private def parseBlocks(): Seq[BlockInfo] = { if (firstBlockStart >= sin.length() || vin.isEnd()) { // no blocks - return + return Seq.empty } + val blocks = mutable.ArrayBuffer.empty[BlockInfo] // buf is used for writing long val buf = new Array[Byte](12) var blockStart = firstBlockStart while (blockStart < sin.length()) { seek(blockStart) if (vin.isEnd()) { - return + return blocks.toSeq } val blockCount = vin.readLong() val blockDataSize = vin.readLong() @@ -211,13 +215,13 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { val blockDataSizeLen: Int = BinaryData.encodeLong(blockDataSize, buf, 0) // (len of entries) + (len of block size) + (block size) + (sync size) - val blockLength = blockCountLen + blockDataSizeLen + blockDataSize + - DataFileConstants.SYNC_SIZE + val blockLength = blockCountLen + blockDataSizeLen + blockDataSize + SYNC_SIZE blocks += BlockInfo(blockStart, blockLength, blockDataSize, blockCount) // Do we need to check the SYNC BUFFER, or just let cudf do it? blockStart += blockLength } + blocks.toSeq } override def close(): Unit = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index 5cde655b271..39f3a98f541 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -715,7 +715,7 @@ abstract class MultiFileCoalescingPartitionReaderBase( * Write a header for a specific file format. If there is no header for the file format, * just ignore it and return 0 * - * @param paths the paths of files to be coalcesed into a single batch + * @param paths the paths of files to be coalesced into a single batch * @param buffer where the header will be written * @return how many bytes written */ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index b6724bbbfe5..d6a4adf8b70 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -21,7 +21,6 @@ import java.net.URI import java.util.concurrent.{Callable, ThreadPoolExecutor} import scala.annotation.tailrec -import scala.collection.JavaConverters._ import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} import scala.language.implicitConversions @@ -124,7 +123,7 @@ case class GpuAvroScan( rapidsConf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, parsedOptions, metrics, pushedFilters, queryUsesInputFile) // Now only coalescing is supported, so need to check it can be used for the final choice. - if (f.canUseCoalesceFilesReader){ + if (f.canUseCoalesceFilesReader) { f } else { // Fall back to PerFile reading @@ -203,7 +202,6 @@ case class GpuAvroMultiFilePartitionReaderFactory( private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles private val numThreads = rapidsConf.avroMultiThreadReadNumThreads - private val maxNumFileProcessed = rapidsConf.maxNumAvroFilesParallel // we can't use the coalescing files reader when InputFileName, InputFileBlockStart, // or InputFileBlockLength because we are combining all the files into a single buffer @@ -248,13 +246,13 @@ case class GpuAvroMultiFilePartitionReaderFactory( } catch { case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${file.filePath}", e) - AvroBlockMeta(null, Seq.empty) + AvroBlockMeta(null, 0L, Seq.empty) // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e case e @(_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e) - AvroBlockMeta(null, Seq.empty) + AvroBlockMeta(null, 0L, Seq.empty) } val fPath = new Path(new URI(file.filePath)) clippedBlocks ++= singleFileInfo.blocks.map(block => @@ -382,7 +380,7 @@ trait GpuAvroReaderBase extends Arm with Logging { self: FilePartitionReaderBase protected final def readPartFile( partFilePath: Path, blocks: Seq[BlockInfo], - header: Header, + headerSize: Long, conf: Configuration): (HostMemoryBuffer, Long) = { withResource(new NvtxWithMetrics("Avro buffer file split", NvtxColor.YELLOW, metrics("bufferTime"))) { _ => @@ -390,11 +388,11 @@ trait GpuAvroReaderBase extends Arm with Logging { self: FilePartitionReaderBase // No need to check the header here since it can not be null when blocks is not empty. return (null, 0L) } - val estOutSize = estimateOutputSize(blocks, header) + val estOutSize = estimateOutputSize(blocks, headerSize) withResource(partFilePath.getFileSystem(conf).open(partFilePath)) { in => closeOnExcept(HostMemoryBuffer.allocate(estOutSize)) { hmb => withResource(new HostMemoryOutputStream(hmb)) { out => - val headerAndBlocks = BlockInfo(0, header.firstBlockStart, 0, 0) +: blocks + val headerAndBlocks = BlockInfo(0, headerSize, 0, 0) +: blocks copyBlocksData(headerAndBlocks, in, out) // check we didn't go over memory if (out.getPos > estOutSize) { @@ -409,9 +407,9 @@ trait GpuAvroReaderBase extends Arm with Logging { self: FilePartitionReaderBase } /** Estimate the total size from the given blocks and header */ - protected final def estimateOutputSize(blocks: Seq[BlockInfo], header: Header): Long = { + protected final def estimateOutputSize(blocks: Seq[BlockInfo], headerSize: Long): Long = { // Start from the Header - var totalSize: Long = header.firstBlockStart + var totalSize: Long = headerSize // Add all blocks totalSize += blocks.map(_.blockLength).sum totalSize @@ -530,7 +528,7 @@ class GpuAvroPartitionReader( None } else { val (dataBuffer, dataSize) = readPartFile(partFilePath, currentChunkedBlocks, - blockMeta.header, conf) + blockMeta.headerSize, conf) sendToGpu(dataBuffer, dataSize, Array(partFile)) } } @@ -586,7 +584,7 @@ class GpuMultiFileAvroPartitionReader( // in 'checkIfNeedToSplitDataBlock' Header.mergeMetadata(headers.toSeq).map { mergedHeader => val allBlocks = blocks.values.flatten.toSeq - estimateOutputSize(allBlocks, mergedHeader) + estimateOutputSize(allBlocks, Header.headerSizeInBytes(mergedHeader)) } getOrElse 0L } @@ -710,11 +708,11 @@ case class AvroFileFilterHandler( filteredBlocks.append(block) } }) - AvroBlockMeta(reader.getHeader(), filteredBlocks) + AvroBlockMeta(reader.getHeader, reader.getHeaderSize, filteredBlocks) } } } else { - AvroBlockMeta(null, Seq.empty) + AvroBlockMeta(null, 0L, Seq.empty) } } } @@ -725,7 +723,7 @@ case class AvroFileFilterHandler( * @param header the header of avro file * @param blocks the total block info of avro file */ -case class AvroBlockMeta(header: Header, blocks: Seq[BlockInfo]) +case class AvroBlockMeta(header: Header, headerSize: Long, blocks: Seq[BlockInfo]) /** * CopyRange to indicate from where to copy. diff --git a/tests/pom.xml b/tests/pom.xml index 975ba3435a6..838a9a75bac 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -85,8 +85,6 @@ org.apache.spark spark-avro_${scala.binary.version} - ${spark.version} - provided @@ -191,19 +189,19 @@ org.apache.spark spark-avro_${scala.binary.version} ${spark.version} - provided + provided org.apache.avro avro-mapred ${spark.version} - provided + provided org.apache.avro avro ${spark.version} - provided + provided org.apache.commons From 9f7e8931205cbde05b1c00960011ac6895901309 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 29 Apr 2022 21:56:00 +0800 Subject: [PATCH 4/8] A small fix Signed-off-by: Firestarman --- .../src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala index 3f3b3105a8c..0a697facf83 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala @@ -114,7 +114,7 @@ object Header { * and maps to different values. */ def hasConflictInMetadata(h1: Header, h2: Header): Boolean = h1.meta.exists { - case (k, v) => h2.meta.get(k).exists(_.sameElements(v)) + case (k, v) => h2.meta.get(k).exists(!_.sameElements(v)) } } From 22a45d437f12ce8f659562b03df279043902b718 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 2 May 2022 17:00:31 +0800 Subject: [PATCH 5/8] Address the new comments Signed-off-by: Firestarman --- pom.xml | 12 +++++----- ...aReader.scala => AvroDataFileReader.scala} | 22 +++++-------------- .../apache/spark/sql/rapids/GpuAvroScan.scala | 4 ++-- 3 files changed, 14 insertions(+), 24 deletions(-) rename sql-plugin/src/main/scala/com/nvidia/spark/rapids/{AvroDataReader.scala => AvroDataFileReader.scala} (94%) diff --git a/pom.xml b/pom.xml index a4c031fffa2..510112c32c2 100644 --- a/pom.xml +++ b/pom.xml @@ -882,6 +882,12 @@ ${spark.version} provided + + org.apache.spark + spark-avro_${scala.binary.version} + ${spark.version} + provided + com.google.flatbuffers flatbuffers-java @@ -911,12 +917,6 @@ ${mockito.version} test - - org.apache.spark - spark-avro_${scala.binary.version} - ${spark.version} - provided - diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala similarity index 94% rename from sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala index 0a697facf83..26c8459c824 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala @@ -135,25 +135,15 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { private val sin = new SeekableInputStream(si) sin.seek(0) // seek to the start of file and get some meta info. private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin); - private var header: Header = null private var firstBlockStart: Long = 0 - // store all blocks info - private var blocks: Option[Seq[BlockInfo]] = None - - initialize() - - def getHeader(): Header = header + val header: Header = initialize() - def getHeaderSize(): Long = firstBlockStart - - def getBlocks(): Seq[BlockInfo] = blocks.getOrElse { - val b = parseBlocks() - blocks = Some(b) - b - } + lazy val headerSize: Long = firstBlockStart + // store all blocks info + lazy val blocks: Seq[BlockInfo] = parseBlocks() - private def initialize(): Unit = { + private def initialize(): Header = { // read magic val magic = new Array[Byte](MAGIC.length) vin.readFixed(magic) @@ -181,8 +171,8 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { // read sync marker val sync = new Array[Byte](SYNC_SIZE) vin.readFixed(sync) - header = Header(meta.toMap, sync) firstBlockStart = sin.tell - vin.inputStream.available + Header(meta.toMap, sync) } private def seek(position: Long): Unit = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index d6a4adf8b70..78986a0eeec 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -700,7 +700,7 @@ case class AvroFileFilterHandler( val in = new FsInput(new Path(new URI(partFile.filePath)), hadoopConf) closeOnExcept(in) { _ => withResource(AvroDataFileReader.openReader(in)) { reader => - val blocks = reader.getBlocks() + val blocks = reader.blocks val filteredBlocks = new ArrayBuffer[BlockInfo]() blocks.foreach(block => { if (partFile.start <= block.blockStart - SYNC_SIZE && @@ -708,7 +708,7 @@ case class AvroFileFilterHandler( filteredBlocks.append(block) } }) - AvroBlockMeta(reader.getHeader, reader.getHeaderSize, filteredBlocks) + AvroBlockMeta(reader.header, reader.headerSize, filteredBlocks) } } } else { From ce84a873828fb654534e4925d1ae3da851705496 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 3 May 2022 09:43:11 +0800 Subject: [PATCH 6/8] Add the latest comment Signed-off-by: Firestarman --- .../nvidia/spark/rapids/AvroDataFileReader.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala index 26c8459c824..88c12744529 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala @@ -135,15 +135,13 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { private val sin = new SeekableInputStream(si) sin.seek(0) // seek to the start of file and get some meta info. private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin); - private var firstBlockStart: Long = 0 - val header: Header = initialize() + val (header, headerSize): (Header, Long) = initialize() - lazy val headerSize: Long = firstBlockStart // store all blocks info lazy val blocks: Seq[BlockInfo] = parseBlocks() - private def initialize(): Header = { + private def initialize(): (Header, Long) = { // read magic val magic = new Array[Byte](MAGIC.length) vin.readFixed(magic) @@ -171,8 +169,7 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { // read sync marker val sync = new Array[Byte](SYNC_SIZE) vin.readFixed(sync) - firstBlockStart = sin.tell - vin.inputStream.available - Header(meta.toMap, sync) + (Header(meta.toMap, sync), sin.tell - vin.inputStream.available) } private def seek(position: Long): Unit = { @@ -181,14 +178,14 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { } private def parseBlocks(): Seq[BlockInfo] = { - if (firstBlockStart >= sin.length() || vin.isEnd()) { + var blockStart = headerSize + if (blockStart >= sin.length() || vin.isEnd()) { // no blocks return Seq.empty } val blocks = mutable.ArrayBuffer.empty[BlockInfo] // buf is used for writing long val buf = new Array[Byte](12) - var blockStart = firstBlockStart while (blockStart < sin.length()) { seek(blockStart) if (vin.isEnd()) { From 6212b9f9683897649256cb695993dcd2dff46780 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 3 May 2022 09:53:57 +0800 Subject: [PATCH 7/8] Remove the used config Signed-off-by: Firestarman --- .../scala/com/nvidia/spark/rapids/RapidsConf.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index b0c7e9b5b8e..05afab07bfe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -964,16 +964,6 @@ object RapidsConf { .integerConf .createWithDefault(20) - val AVRO_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL = - conf("spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel") - .doc("A limit on the maximum number of files per task processed in parallel on the CPU " + - "side before the file is sent to the GPU. This affects the amount of host memory used " + - "when reading the files in parallel. Used with MULTITHREADED reader, see " + - "spark.rapids.sql.format.avro.reader.type") - .integerConf - .checkValue(v => v > 0, "The maximum number of files must be greater than 0.") - .createWithDefault(Integer.MAX_VALUE) - val ENABLE_RANGE_WINDOW_BYTES = conf("spark.rapids.sql.window.range.byte.enabled") .doc("When the order-by column of a range based window is byte type and " + "the range boundary calculated for a value has overflow, CPU and GPU will get " + @@ -1757,8 +1747,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val avroMultiThreadReadNumThreads: Int = get(AVRO_MULTITHREAD_READ_NUM_THREADS) - lazy val maxNumAvroFilesParallel: Int = get(AVRO_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL) - lazy val shuffleManagerEnabled: Boolean = get(SHUFFLE_MANAGER_ENABLED) lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE) From 3ab0ac9d36883705a56b07278a1e5f50ce12f225 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 3 May 2022 10:15:45 +0800 Subject: [PATCH 8/8] Update the doc Signed-off-by: Firestarman --- docs/configs.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/configs.md b/docs/configs.md index f59de6c6fe8..a69b47d8411 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -75,7 +75,6 @@ Name | Description | Default Value spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false -spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type|2147483647 spark.rapids.sql.format.avro.multiThreadedRead.numThreads|The maximum number of threads, on one executor, to use for reading small avro files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type.|20 spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false spark.rapids.sql.format.avro.reader.type|Sets the avro reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using COALESCING is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.avro.multiThreadedRead.numThreads. By default this is set to AUTO so we select the reader we think is best. This will be COALESCING.|AUTO