Skip to content

Commit

Permalink
Dump Parquet Meta as SparkMetrics
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx committed Jan 16, 2024
1 parent 6422930 commit 9b29da6
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ trait GpuBatchScanExecMetrics extends GpuExec {
lazy val fileCacheMetrics: Map[String, GpuMetric] = {
// File cache only supported on Parquet files for now.
scan match {
case _: GpuParquetScan | _: GpuOrcScan => createFileCacheMetrics()
case _: GpuParquetScan | _: GpuOrcScan =>
createFileCacheMetrics() ++
// For debugging ByteDance workloads
Map(
"compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"),
"unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"),
"nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"),
"maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"),
"minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"),
"maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"),
BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME),
BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME),
BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME))
case _ => Map.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ object MetricsLevel {
object GpuMetric extends Logging {
// Metric names.
val BUFFER_TIME = "bufferTime"
val BUFFER_DATA_TIME = "bufferDataTime"
val BUFFER_META_TIME = "bufferMetaTime"
val BUFFER_RESIZE_TIME = "bufferResizeTime"
val COPY_BUFFER_TIME = "copyBufferTime"
val GPU_DECODE_TIME = "gpuDecodeTime"
val NUM_INPUT_ROWS = "numInputRows"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,41 @@ trait DataBlockBase {
def getReadDataSize: Long
// the block size to be used to slice the whole HostMemoryBuffer
def getBlockSize: Long

case class ColumnStats(compressedSize: Long,
uncompressedSize: Long,
nullCount: Long,
minFieldSize: Int,
maxFieldSize: Int)

protected def getColumnStatistics: Seq[ColumnStats] = Seq()

def updateMetrics(metrics: Map[String, GpuMetric]): Unit = {
getColumnStatistics match {
case stats if stats.isEmpty =>
case stats =>
var minR: Double = 1
var maxR: Double = 0
var maxFieldSize: Int = 0
stats.foreach { s =>
metrics("compPageSize") += s.compressedSize
if (s.uncompressedSize > 0) {
metrics("unCompPageSize") += s.uncompressedSize
val r: Double = s.compressedSize.toDouble / s.uncompressedSize.toDouble
maxR = maxR max r
minR = minR min r
}
maxFieldSize = maxFieldSize max s.maxFieldSize
}
metrics("nullCount") += stats.head.nullCount
metrics("maxCPR").set(metrics("maxCPR").value max ((1.0 - minR) * 100000L).toLong)
if (metrics("minCPR").value == 0L) {
metrics("minCPR").set(100000L)
}
metrics("minCPR").set(metrics("minCPR").value min ((1.0 - maxR) * 100000L).toLong)
metrics("maxFieldSize").set(metrics("maxFieldSize").value max (maxFieldSize * 10L))
}
}
}

/**
Expand Down Expand Up @@ -1058,12 +1093,16 @@ abstract class MultiFileCoalescingPartitionReaderBase(
if (currentChunkMeta.currentChunk.isEmpty) {
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
currentChunkMeta.currentChunk.foreach(_._2.updateMetrics(metrics))

val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk,
currentChunkMeta.clippedSchema)

if (dataSize == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {

startNewBufferRetry
RmmRapidsRetryIterator.withRetry(dataBuffer, chunkedSplit(_)) { _ =>
// We don't want to actually close the host buffer until we know that we don't
Expand Down Expand Up @@ -1110,7 +1149,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
val batchContext = createBatchContext(filesAndBlocks, clippedSchema)
// First, estimate the output file size for the initial allocating.
// the estimated size should be >= size of HEAD + Blocks + FOOTER
val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext)
val initTotalSize =
withResource(new NvtxWithMetrics("Buffer size eval", NvtxColor.ORANGE,
metrics("bufferMetaTime"))) { _ =>
calculateEstimatedBlocksOutputSize(batchContext)
}
val (buffer, bufferSize, footerOffset, outBlocks) =
closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb =>
// Second, write header
Expand All @@ -1129,15 +1172,21 @@ abstract class MultiFileCoalescingPartitionReaderBase(
offset += fileBlockSize
}

for (future <- tasks.asScala) {
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
withResource(new NvtxWithMetrics("Buffer read data", NvtxColor.PURPLE,
metrics("bufferDataTime"))) { _ =>
for (future <- tasks.asScala) {
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
}
}

// Fourth, calculate the final buffer size
val finalBufferSize = calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq,
batchContext)
val finalBufferSize = withResource(new NvtxWithMetrics("Buffer size eval",
NvtxColor.RED, metrics("bufferMetaTime"))) { _ =>
calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq,
batchContext)
}

(hmb, finalBufferSize, offset, allOutputBlocks.toSeq)
}
Expand All @@ -1154,14 +1203,17 @@ abstract class MultiFileCoalescingPartitionReaderBase(
s"reallocating and copying data to bigger buffer size: $bufferSize")
}
// Copy the old buffer to a new allocated bigger buffer and close the old buffer
buf = withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
buf = withResource(new NvtxWithMetrics("Buffer resize time",
NvtxColor.RED, metrics("bufferResizeTime"))) { _ =>
withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
}
newhmb
}
newhmb
}
}
}
Expand All @@ -1175,9 +1227,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// Closing the original buf and returning a new allocated buffer is allowed, but there is no
// reason to do that.
// If you have to do this, please think about to add other abstract methods first.
val (finalBuffer, finalBufferSize) = writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, batchContext)

val (finalBuffer, finalBufferSize) = withResource(new NvtxWithMetrics("Buffer write footer",
NvtxColor.WHITE, metrics("bufferMetaTime"))) { _ =>
writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, batchContext)
}
closeOnExcept(finalBuffer) { _ =>
// triple check we didn't go over memory
if (finalBufferSize > totalBufferSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Locale}
import java.util.concurrent._

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions

import ai.rapids.cudf._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric._
Expand All @@ -46,6 +44,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.parquet.bytes.BytesUtils
import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian
import org.apache.parquet.column.ColumnDescriptor
import org.apache.parquet.column.statistics.{BinaryStatistics, BooleanStatistics, DoubleStatistics, FloatStatistics, IntStatistics, LongStatistics}
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
Expand All @@ -54,7 +53,6 @@ import org.apache.parquet.hadoop.metadata._
import org.apache.parquet.io.{InputFile, SeekableInputStream}
import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, OriginalType, PrimitiveType, Type}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -1596,6 +1594,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize)
withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in =>
coalescedRanges.foreach { blockCopy =>

totalBytesCopied += copyDataRange(blockCopy, in, out, copyBuffer)
}
}
Expand Down Expand Up @@ -1819,6 +1818,29 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB
override def getRowCount: Long = dataBlock.getRowCount
override def getReadDataSize: Long = dataBlock.getTotalByteSize
override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum

override protected def getColumnStatistics: Seq[ColumnStats] = {
dataBlock.getColumns.asScala.map { c =>
val (nullCnt, minField, maxField) = c.getStatistics match {
case s: BinaryStatistics =>
(s.getNumNulls, s.genericGetMin.length, s.genericGetMax.length)
case s: BooleanStatistics =>
(s.getNumNulls, 1, 1)
case s: IntStatistics =>
(s.getNumNulls, 4, 4)
case s: LongStatistics =>
(s.getNumNulls, 8, 8)
case s: FloatStatistics =>
(s.getNumNulls, 4, 4)
case s: DoubleStatistics =>
(s.getNumNulls, 8, 8)
case s =>
throw new Exception(s"Invalid value $s")
}
ColumnStats(c.getTotalSize, c.getTotalUncompressedSize,
nullCnt, minField, maxField)
}
}
}

/** Parquet extra information containing rebase modes and whether there is int96 timestamp */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util.rapids

import java.io.PrintWriter

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData}

object ParquetMetaUtils {

def showDetails(out: PrintWriter, meta: BlockMetaData): Unit = {
showDetails(out, meta, None)
}

private def showDetails(out: PrintWriter, meta: BlockMetaData, num: Option[Long]): Unit = {
val rows = meta.getRowCount
val tbs = meta.getTotalByteSize
val offset = meta.getStartingPos
out.println(s"row group${num.fold("")(" " + _)}: RC:$rows TS:$tbs OFFSET:$offset")
out.println("-----------------------------")
showDetails(out, meta.getColumns.asScala)
}

private case class PathNode(value: Either[ColumnChunkMetaData,
mutable.LinkedHashMap[String, PathNode]])

def showDetails(out: PrintWriter, colChunkMeta: Seq[ColumnChunkMetaData]): Unit = {
val chunks = PathNode(Right(mutable.LinkedHashMap[String, PathNode]()))

colChunkMeta.foreach { meta =>
val paths = meta.getPath.toArray
var cursor = chunks
(0 until paths.length).foreach { i =>
cursor.value match {
case Right(children) if i == paths.length - 1 =>
children(paths(i)) = PathNode(Left(meta))
case Right(children) =>
cursor = children.getOrElseUpdate(paths(i),
PathNode(Right(mutable.LinkedHashMap[String, PathNode]())))
case Left(_) =>
throw new Exception("found leaf value in a non-leaf path")
}
}
}

showColumnChunkDetails(out, chunks, 0)
}

private def showColumnChunkDetails(out: PrintWriter, current: PathNode, depth: Int): Unit = {
current.value match {
case Right(children) =>
children.foreach { case (key, child) =>
val name = "." * depth + key
child.value match {
case Right(_) =>
out.println(s"$name: ")
showColumnChunkDetails(out, child, depth + 1)
case Left(meta) =>
out.print(s"$name: ")
showDetails(out, meta, includeName = false)
}
}
case Left(_) =>
throw new Exception("found leaf value in a non-leaf path")
}
}

private def showDetails(out: PrintWriter,
meta: ColumnChunkMetaData,
includeName: Boolean): Unit = {
val dictPageOff = meta.getDictionaryPageOffset
val firstPageOff = meta.getFirstDataPageOffset
val compSize = meta.getTotalSize
val unCompSize = meta.getTotalUncompressedSize
val count = meta.getValueCount
val ratio = unCompSize.toDouble / compSize
val encodings = meta.getEncodings.asScala.filterNot(_ == null).mkString(",")

if (includeName) {
val path = meta.getPath.asScala.filterNot(_ == null).mkString(".")
out.print(s"$path: ")
}
out.println(s" ${meta.getPrimitiveType} ${meta.getCodec} DO:$dictPageOff " +
s"FPO:$firstPageOff SZ:$compSize/$unCompSize/$ratio VC:$count ENC:$encodings")
val stats = Option(meta.getStatistics).map(_.toString).getOrElse("none")
out.println(s" ST:[$stats]")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,17 @@ case class GpuFileSourceScanExec(
relation.fileFormat match {
case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat =>
Map(READ_FS_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_READ_FS_TIME),
WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME))
WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME),
// For debugging ByteDance workloads
"compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"),
"unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"),
"nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"),
"maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"),
"minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"),
"maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"),
BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME),
BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME),
BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME))
case _ =>
Map.empty[String, GpuMetric]
}
Expand Down

0 comments on commit 9b29da6

Please sign in to comment.