Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics GpuPartitioning.CopyToHostTime #11882

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object GpuMetric extends Logging {
val DELETION_VECTOR_SIZE = "deletionVectorSize"
val CONCAT_HEADER_TIME = "concatHeaderTime"
val CONCAT_BUFFER_TIME = "concatBufferTime"
val COPY_TO_HOST_TIME = "d2hMemCopyTime"

// Metric Descriptions.
val DESCRIPTION_BUFFER_TIME = "buffer time"
Expand Down Expand Up @@ -133,6 +134,7 @@ object GpuMetric extends Logging {
val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size"
val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time"
val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time"
val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,23 @@ trait GpuPartitioning extends Partitioning {
val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns)
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize

val hostPartColumns = withResource(partitionColumns) { _ =>
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM))
// We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync,
// because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when
// the copy is from pageable memory, and we're not guaranteed to be using pinned memory).
val hostPartColumns = withResource(
new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ =>
val hostColumns = withResource(partitionColumns) { _ =>
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM))
}
}
closeOnExcept(hostColumns) { _ =>
Cuda.DEFAULT_STREAM.sync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only time spent on copy to host. The copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when the copy is from pageable memory, and we're not guaranteed to be using pinned memory). Therefore the metric and NVTX range needs to cover the copyToHostAsync calls above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refined the code to wrap them all.

}
hostColumns
}

withResource(hostPartColumns) { _ =>
Cuda.DEFAULT_STREAM.sync()
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())

Expand Down Expand Up @@ -241,4 +251,14 @@ trait GpuPartitioning extends Partitioning {
}
}
}

private var memCopyTime: GpuMetric = NoopMetric

/**
* Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to
* be called at the query planning stage. Therefore, this method is NOT thread safe.
*/
def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = {
metrics.get(GpuMetric.COPY_TO_HOST_TIME).foreach(memCopyTime = _)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ abstract class GpuShuffleExchangeExecBase(
PARTITION_SIZE -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_PARTITION_SIZE),
NUM_PARTITIONS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_PARTITIONS),
NUM_OUTPUT_ROWS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_OUTPUT_ROWS),
NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES)
NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES),
COPY_TO_HOST_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_COPY_TO_HOST_TIME)
) ++ additionalMetrics

override def nodeName: String = "GpuColumnarExchange"
Expand Down Expand Up @@ -364,6 +365,12 @@ object GpuShuffleExchangeExecBase {
rdd
}
val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning)
// Inject debugging subMetrics, such as D2HTime before SliceOnCpu
// The injected metrics will be serialized as the members of GpuPartitioning
partitioner match {
case pt: GpuPartitioning => pt.setupDebugMetrics(metrics)
case _ =>
}
val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
def getPartitioned: ColumnarBatch => Any = {
batch => partitionTime.ns {
Expand Down
Loading