Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for kudo write metrics #11784

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHea
import org.apache.spark.TaskContext

import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SER_STREAM_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME}
import org.apache.spark.sql.types.{DataType, NullType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -79,7 +79,7 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
}
}

private def readNextBatch(): ColumnarBatch = {
private def readNextBatch(): ColumnarBatch = deserTime.ns {
withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ =>
val header = nextHeader.get
nextHeader = None
Expand Down Expand Up @@ -143,8 +143,8 @@ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Arr
private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric]) extends
SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME)


override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
Expand Down Expand Up @@ -337,11 +337,11 @@ private class KudoSerializerInstance(
val metrics: Map[String, GpuMetric],
val dataTypes: Array[DataType]) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME)
private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME)
private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME)

private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes))

Expand Down Expand Up @@ -534,7 +534,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
}
}

private def readNextBatch(): ColumnarBatch = {
private def readNextBatch(): ColumnarBatch = deserTime.ns {
withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ =>
val header = nextHeader.get
nextHeader = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
extends RapidsShuffleWriter[K, V]
with RapidsShuffleWriterShimHelper {
private val metrics = handle.metrics
private val serializationTimeMetric =
metrics.get(METRIC_SHUFFLE_SERIALIZATION_TIME)
private val shuffleWriteTimeMetric =
metrics.get(METRIC_SHUFFLE_WRITE_TIME)
private val shuffleCombineTimeMetric =
Expand Down Expand Up @@ -428,9 +430,11 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
// counted in the ioTime
val totalPerRecordWriteTime = recordWriteTime.get() + ioTimeNs
val ioRatio = (ioTimeNs.toDouble/totalPerRecordWriteTime)
val serializationRatio = 1.0 - ioRatio

// update metrics, note that we expect them to be relative to the task
ioTimeMetric.foreach(_ += (ioRatio * writeTimeNs).toLong)
serializationTimeMetric.foreach(_ += (serializationRatio * writeTimeNs).toLong)
// we add all three here because this metric is meant to show the time
// we are blocked on writes
shuffleWriteTimeMetric.foreach(_ += (writeTimeNs + combineTimeNs))
Expand Down Expand Up @@ -593,6 +597,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](

private val sqlMetrics = handle.metrics
private val dep = handle.dependency
private val deserializationTimeNs = sqlMetrics.get(METRIC_SHUFFLE_DESERIALIZATION_TIME)
private val shuffleReadTimeNs = sqlMetrics.get(METRIC_SHUFFLE_READ_TIME)
private val dataReadSize = sqlMetrics.get(METRIC_DATA_READ_SIZE)

Expand Down Expand Up @@ -675,6 +680,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
}
val res = currentIter.next()
val fetchTime = System.nanoTime() - fetchTimeStart
deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime))
shuffleReadTimeNs.foreach(_ += fetchTime)
res
}
Expand Down Expand Up @@ -847,6 +853,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
case _ => 0 // TODO: do we need to handle other types here?
}
waitTime += System.nanoTime() - waitTimeStart
deserializationTimeNs.foreach(_ += waitTime)
shuffleReadTimeNs.foreach(_ += waitTime)
res
}
Expand Down Expand Up @@ -952,6 +959,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
}
// keep track of the overall metric which includes blocked time
val fetchTime = System.nanoTime() - fetchTimeStart
deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime))
shuffleReadTimeNs.foreach(_ += fetchTime)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuShuffleDependency
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{createAdditionalExchangeMetris, METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.MutablePair
Expand Down Expand Up @@ -198,7 +198,7 @@ abstract class GpuShuffleExchangeExecBase(
lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val additionalMetrics : Map[String, GpuMetric] = {
createAdditionalExchangeMetris(this) ++
createAdditionalExchangeMetrics(this) ++
GpuMetric.wrap(readMetrics) ++
GpuMetric.wrap(writeMetrics)
}
Expand Down Expand Up @@ -270,27 +270,32 @@ object GpuShuffleExchangeExecBase {
val METRIC_DATA_READ_SIZE = "dataReadSize"
val METRIC_DESC_DATA_READ_SIZE = "data read size"
val METRIC_SHUFFLE_SERIALIZATION_TIME = "rapidsShuffleSerializationTime"
val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rs. serialization time"
val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rapids shuffle serialization time"
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val METRIC_SHUFFLE_SER_STREAM_TIME = "rapidsShuffleSerializationStreamTime"
val METRIC_DESC_SHUFFLE_SER_STREAM_TIME = "rapids shuffle serialization to output stream time"
val METRIC_SHUFFLE_DESERIALIZATION_TIME = "rapidsShuffleDeserializationTime"
val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "rs. deserialization time"
val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "rapids shuffle deserialization time"
val METRIC_SHUFFLE_DESER_STREAM_TIME = "rapidsShuffleDeserializationStreamTime"
val METRIC_DESC_SHUFFLE_DESER_STREAM_TIME =
"rapids shuffle deserialization from input stream time"
val METRIC_SHUFFLE_PARTITION_TIME = "rapidsShufflePartitionTime"
val METRIC_DESC_SHUFFLE_PARTITION_TIME = "rs. partition time"
val METRIC_DESC_SHUFFLE_PARTITION_TIME = "rapids shuffle partition time"
val METRIC_SHUFFLE_WRITE_TIME = "rapidsShuffleWriteTime"
val METRIC_DESC_SHUFFLE_WRITE_TIME = "rs. shuffle write time"
val METRIC_DESC_SHUFFLE_WRITE_TIME = "rapids shuffle shuffle write time"
val METRIC_SHUFFLE_COMBINE_TIME = "rapidsShuffleCombineTime"
val METRIC_DESC_SHUFFLE_COMBINE_TIME = "rs. shuffle combine time"
val METRIC_DESC_SHUFFLE_COMBINE_TIME = "rapids shuffle shuffle combine time"
val METRIC_SHUFFLE_WRITE_IO_TIME = "rapidsShuffleWriteIoTime"
val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "rs. shuffle write io time"
val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "rapids shuffle shuffle write io time"
val METRIC_SHUFFLE_READ_TIME = "rapidsShuffleReadTime"
val METRIC_DESC_SHUFFLE_READ_TIME = "rs. shuffle read time"
val METRIC_DESC_SHUFFLE_READ_TIME = "rapids shuffle shuffle read time"
val METRIC_SHUFFLE_SER_CALC_HEADER_TIME = "rapidsShuffleSerializationCalcHeaderTime"
val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "rs. serialization calc header time"
val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "rapids shuffle serialization calc header time"
val METRIC_SHUFFLE_SER_COPY_HEADER_TIME = "rapidsShuffleSerializationCopyHeaderTime"
val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "rs. serialization copy header time"
val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "rapids shuffle serialization copy header time"
val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime"
val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rs. serialization copy buffer time"
val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapids shuffle serialization copy buffer time"

def createAdditionalExchangeMetris(gpu: GpuExec): Map[String, GpuMetric] = Map(
def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map(
// dataSize and dataReadSize are uncompressed, one is on write and the
// other on read
METRIC_DATA_SIZE -> gpu.createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE),
Expand Down Expand Up @@ -356,9 +361,9 @@ object GpuShuffleExchangeExecBase {
rdd
}
val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning)
val partitonTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
def getPartitioned: ColumnarBatch => Any = {
batch => partitonTime.ns {
batch => partitionTime.ns {
partitioner.columnarEvalAny(batch)
}
}
Expand Down
Loading