From 9b29da688526937f6fd520b2e770125cca550069 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 12 Jan 2024 18:56:21 +0800 Subject: [PATCH] Dump Parquet Meta as SparkMetrics Signed-off-by: sperlingxx --- .../rapids/GpuBatchScanExecMetrics.scala | 14 ++- .../com/nvidia/spark/rapids/GpuExec.scala | 3 + .../spark/rapids/GpuMultiFileReader.scala | 88 ++++++++++++--- .../nvidia/spark/rapids/GpuParquetScan.scala | 28 ++++- .../util/rapids/ParquetMetaUtils.scala | 106 ++++++++++++++++++ .../sql/rapids/GpuFileSourceScanExec.scala | 12 +- 6 files changed, 229 insertions(+), 22 deletions(-) create mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/util/rapids/ParquetMetaUtils.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala index ef890b1551f..c7ed48f3b63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala @@ -36,7 +36,19 @@ trait GpuBatchScanExecMetrics extends GpuExec { lazy val fileCacheMetrics: Map[String, GpuMetric] = { // File cache only supported on Parquet files for now. scan match { - case _: GpuParquetScan | _: GpuOrcScan => createFileCacheMetrics() + case _: GpuParquetScan | _: GpuOrcScan => + createFileCacheMetrics() ++ + // For debugging ByteDance workloads + Map( + "compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"), + "unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"), + "nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"), + "maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"), + "minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"), + "maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"), + BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME), + BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME), + BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME)) case _ => Map.empty } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 2bec8bc581a..7fcf3dac7db 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -49,6 +49,9 @@ object MetricsLevel { object GpuMetric extends Logging { // Metric names. val BUFFER_TIME = "bufferTime" + val BUFFER_DATA_TIME = "bufferDataTime" + val BUFFER_META_TIME = "bufferMetaTime" + val BUFFER_RESIZE_TIME = "bufferResizeTime" val COPY_BUFFER_TIME = "copyBufferTime" val GPU_DECODE_TIME = "gpuDecodeTime" val NUM_INPUT_ROWS = "numInputRows" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index e4af2e4b6de..41597f49e44 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -756,6 +756,41 @@ trait DataBlockBase { def getReadDataSize: Long // the block size to be used to slice the whole HostMemoryBuffer def getBlockSize: Long + + case class ColumnStats(compressedSize: Long, + uncompressedSize: Long, + nullCount: Long, + minFieldSize: Int, + maxFieldSize: Int) + + protected def getColumnStatistics: Seq[ColumnStats] = Seq() + + def updateMetrics(metrics: Map[String, GpuMetric]): Unit = { + getColumnStatistics match { + case stats if stats.isEmpty => + case stats => + var minR: Double = 1 + var maxR: Double = 0 + var maxFieldSize: Int = 0 + stats.foreach { s => + metrics("compPageSize") += s.compressedSize + if (s.uncompressedSize > 0) { + metrics("unCompPageSize") += s.uncompressedSize + val r: Double = s.compressedSize.toDouble / s.uncompressedSize.toDouble + maxR = maxR max r + minR = minR min r + } + maxFieldSize = maxFieldSize max s.maxFieldSize + } + metrics("nullCount") += stats.head.nullCount + metrics("maxCPR").set(metrics("maxCPR").value max ((1.0 - minR) * 100000L).toLong) + if (metrics("minCPR").value == 0L) { + metrics("minCPR").set(100000L) + } + metrics("minCPR").set(metrics("minCPR").value min ((1.0 - maxR) * 100000L).toLong) + metrics("maxFieldSize").set(metrics("maxFieldSize").value max (maxFieldSize * 10L)) + } + } } /** @@ -1058,12 +1093,16 @@ abstract class MultiFileCoalescingPartitionReaderBase( if (currentChunkMeta.currentChunk.isEmpty) { CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { + currentChunkMeta.currentChunk.foreach(_._2.updateMetrics(metrics)) + val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk, currentChunkMeta.clippedSchema) + if (dataSize == 0) { dataBuffer.close() CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { + startNewBufferRetry RmmRapidsRetryIterator.withRetry(dataBuffer, chunkedSplit(_)) { _ => // We don't want to actually close the host buffer until we know that we don't @@ -1110,7 +1149,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( val batchContext = createBatchContext(filesAndBlocks, clippedSchema) // First, estimate the output file size for the initial allocating. // the estimated size should be >= size of HEAD + Blocks + FOOTER - val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext) + val initTotalSize = + withResource(new NvtxWithMetrics("Buffer size eval", NvtxColor.ORANGE, + metrics("bufferMetaTime"))) { _ => + calculateEstimatedBlocksOutputSize(batchContext) + } val (buffer, bufferSize, footerOffset, outBlocks) = closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb => // Second, write header @@ -1129,15 +1172,21 @@ abstract class MultiFileCoalescingPartitionReaderBase( offset += fileBlockSize } - for (future <- tasks.asScala) { - val (blocks, bytesRead) = future.get() - allOutputBlocks ++= blocks - TrampolineUtil.incBytesRead(inputMetrics, bytesRead) + withResource(new NvtxWithMetrics("Buffer read data", NvtxColor.PURPLE, + metrics("bufferDataTime"))) { _ => + for (future <- tasks.asScala) { + val (blocks, bytesRead) = future.get() + allOutputBlocks ++= blocks + TrampolineUtil.incBytesRead(inputMetrics, bytesRead) + } } // Fourth, calculate the final buffer size - val finalBufferSize = calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq, - batchContext) + val finalBufferSize = withResource(new NvtxWithMetrics("Buffer size eval", + NvtxColor.RED, metrics("bufferMetaTime"))) { _ => + calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq, + batchContext) + } (hmb, finalBufferSize, offset, allOutputBlocks.toSeq) } @@ -1154,14 +1203,17 @@ abstract class MultiFileCoalescingPartitionReaderBase( s"reallocating and copying data to bigger buffer size: $bufferSize") } // Copy the old buffer to a new allocated bigger buffer and close the old buffer - buf = withResource(buffer) { _ => - withResource(new HostMemoryInputStream(buffer, footerOffset)) { in => - // realloc memory and copy - closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb => - withResource(new HostMemoryOutputStream(newhmb)) { out => - IOUtils.copy(in, out) + buf = withResource(new NvtxWithMetrics("Buffer resize time", + NvtxColor.RED, metrics("bufferResizeTime"))) { _ => + withResource(buffer) { _ => + withResource(new HostMemoryInputStream(buffer, footerOffset)) { in => + // realloc memory and copy + closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb => + withResource(new HostMemoryOutputStream(newhmb)) { out => + IOUtils.copy(in, out) + } + newhmb } - newhmb } } } @@ -1175,9 +1227,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( // Closing the original buf and returning a new allocated buffer is allowed, but there is no // reason to do that. // If you have to do this, please think about to add other abstract methods first. - val (finalBuffer, finalBufferSize) = writeFileFooter(buf, totalBufferSize, footerOffset, - outBlocks, batchContext) - + val (finalBuffer, finalBufferSize) = withResource(new NvtxWithMetrics("Buffer write footer", + NvtxColor.WHITE, metrics("bufferMetaTime"))) { _ => + writeFileFooter(buf, totalBufferSize, footerOffset, + outBlocks, batchContext) + } closeOnExcept(finalBuffer) { _ => // triple check we didn't go over memory if (finalBufferSize > totalBufferSize) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 207b6ddaa9b..400703ad04b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -24,12 +24,10 @@ import java.nio.charset.StandardCharsets import java.util import java.util.{Collections, Locale} import java.util.concurrent._ - import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions - import ai.rapids.cudf._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ @@ -46,6 +44,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path} import org.apache.parquet.bytes.BytesUtils import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.column.statistics.{BinaryStatistics, BooleanStatistics, DoubleStatistics, FloatStatistics, IntStatistics, LongStatistics} import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} @@ -54,7 +53,6 @@ import org.apache.parquet.hadoop.metadata._ import org.apache.parquet.io.{InputFile, SeekableInputStream} import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, OriginalType, PrimitiveType, Type} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName - import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging @@ -1596,6 +1594,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize) withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in => coalescedRanges.foreach { blockCopy => + totalBytesCopied += copyDataRange(blockCopy, in, out, copyBuffer) } } @@ -1819,6 +1818,29 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB override def getRowCount: Long = dataBlock.getRowCount override def getReadDataSize: Long = dataBlock.getTotalByteSize override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum + + override protected def getColumnStatistics: Seq[ColumnStats] = { + dataBlock.getColumns.asScala.map { c => + val (nullCnt, minField, maxField) = c.getStatistics match { + case s: BinaryStatistics => + (s.getNumNulls, s.genericGetMin.length, s.genericGetMax.length) + case s: BooleanStatistics => + (s.getNumNulls, 1, 1) + case s: IntStatistics => + (s.getNumNulls, 4, 4) + case s: LongStatistics => + (s.getNumNulls, 8, 8) + case s: FloatStatistics => + (s.getNumNulls, 4, 4) + case s: DoubleStatistics => + (s.getNumNulls, 8, 8) + case s => + throw new Exception(s"Invalid value $s") + } + ColumnStats(c.getTotalSize, c.getTotalUncompressedSize, + nullCnt, minField, maxField) + } + } } /** Parquet extra information containing rebase modes and whether there is int96 timestamp */ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/util/rapids/ParquetMetaUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/util/rapids/ParquetMetaUtils.scala new file mode 100644 index 00000000000..299d6b685b2 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/util/rapids/ParquetMetaUtils.scala @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util.rapids + +import java.io.PrintWriter + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData} + +object ParquetMetaUtils { + + def showDetails(out: PrintWriter, meta: BlockMetaData): Unit = { + showDetails(out, meta, None) + } + + private def showDetails(out: PrintWriter, meta: BlockMetaData, num: Option[Long]): Unit = { + val rows = meta.getRowCount + val tbs = meta.getTotalByteSize + val offset = meta.getStartingPos + out.println(s"row group${num.fold("")(" " + _)}: RC:$rows TS:$tbs OFFSET:$offset") + out.println("-----------------------------") + showDetails(out, meta.getColumns.asScala) + } + + private case class PathNode(value: Either[ColumnChunkMetaData, + mutable.LinkedHashMap[String, PathNode]]) + + def showDetails(out: PrintWriter, colChunkMeta: Seq[ColumnChunkMetaData]): Unit = { + val chunks = PathNode(Right(mutable.LinkedHashMap[String, PathNode]())) + + colChunkMeta.foreach { meta => + val paths = meta.getPath.toArray + var cursor = chunks + (0 until paths.length).foreach { i => + cursor.value match { + case Right(children) if i == paths.length - 1 => + children(paths(i)) = PathNode(Left(meta)) + case Right(children) => + cursor = children.getOrElseUpdate(paths(i), + PathNode(Right(mutable.LinkedHashMap[String, PathNode]()))) + case Left(_) => + throw new Exception("found leaf value in a non-leaf path") + } + } + } + + showColumnChunkDetails(out, chunks, 0) + } + + private def showColumnChunkDetails(out: PrintWriter, current: PathNode, depth: Int): Unit = { + current.value match { + case Right(children) => + children.foreach { case (key, child) => + val name = "." * depth + key + child.value match { + case Right(_) => + out.println(s"$name: ") + showColumnChunkDetails(out, child, depth + 1) + case Left(meta) => + out.print(s"$name: ") + showDetails(out, meta, includeName = false) + } + } + case Left(_) => + throw new Exception("found leaf value in a non-leaf path") + } + } + + private def showDetails(out: PrintWriter, + meta: ColumnChunkMetaData, + includeName: Boolean): Unit = { + val dictPageOff = meta.getDictionaryPageOffset + val firstPageOff = meta.getFirstDataPageOffset + val compSize = meta.getTotalSize + val unCompSize = meta.getTotalUncompressedSize + val count = meta.getValueCount + val ratio = unCompSize.toDouble / compSize + val encodings = meta.getEncodings.asScala.filterNot(_ == null).mkString(",") + + if (includeName) { + val path = meta.getPath.asScala.filterNot(_ == null).mkString(".") + out.print(s"$path: ") + } + out.println(s" ${meta.getPrimitiveType} ${meta.getCodec} DO:$dictPageOff " + + s"FPO:$firstPageOff SZ:$compSize/$unCompSize/$ratio VC:$count ENC:$encodings") + val stats = Option(meta.getStatistics).map(_.toString).getOrElse("none") + out.println(s" ST:[$stats]") + } + +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index b4fca369f71..05151e481d7 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -429,7 +429,17 @@ case class GpuFileSourceScanExec( relation.fileFormat match { case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat => Map(READ_FS_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_READ_FS_TIME), - WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME)) + WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME), + // For debugging ByteDance workloads + "compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"), + "unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"), + "nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"), + "maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"), + "minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"), + "maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"), + BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME), + BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME), + BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME)) case _ => Map.empty[String, GpuMetric] }