diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 850a04f390f..bc67366d347 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -92,6 +92,7 @@ object GpuMetric extends Logging { val DELETION_VECTOR_SIZE = "deletionVectorSize" val CONCAT_HEADER_TIME = "concatHeaderTime" val CONCAT_BUFFER_TIME = "concatBufferTime" + val COPY_TO_HOST_TIME = "d2hMemCopyTime" // Metric Descriptions. val DESCRIPTION_BUFFER_TIME = "buffer time" @@ -133,6 +134,7 @@ object GpuMetric extends Logging { val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size" val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time" val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time" + val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time" def unwrap(input: GpuMetric): SQLMetric = input match { case w :WrappedGpuMetric => w.sqlMetric diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 6394e2974b4..4fbc612591b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -126,13 +126,23 @@ trait GpuPartitioning extends Partitioning { val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns) val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize - val hostPartColumns = withResource(partitionColumns) { _ => - withRetryNoSplit { - partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM)) + // We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync, + // because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when + // the copy is from pageable memory, and we're not guaranteed to be using pinned memory). + val hostPartColumns = withResource( + new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ => + val hostColumns = withResource(partitionColumns) { _ => + withRetryNoSplit { + partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM)) + } + } + closeOnExcept(hostColumns) { _ => + Cuda.DEFAULT_STREAM.sync() } + hostColumns } + withResource(hostPartColumns) { _ => - Cuda.DEFAULT_STREAM.sync() // Leaving the GPU for a while GpuSemaphore.releaseIfNecessary(TaskContext.get()) @@ -241,4 +251,14 @@ trait GpuPartitioning extends Partitioning { } } } + + private var memCopyTime: GpuMetric = NoopMetric + + /** + * Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to + * be called at the query planning stage. Therefore, this method is NOT thread safe. + */ + def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = { + metrics.get(GpuMetric.COPY_TO_HOST_TIME).foreach(memCopyTime = _) + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 332545a99e1..0e1b857317c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -208,7 +208,8 @@ abstract class GpuShuffleExchangeExecBase( PARTITION_SIZE -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_PARTITION_SIZE), NUM_PARTITIONS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_PARTITIONS), NUM_OUTPUT_ROWS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_OUTPUT_ROWS), - NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES) + NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES), + COPY_TO_HOST_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_COPY_TO_HOST_TIME) ) ++ additionalMetrics override def nodeName: String = "GpuColumnarExchange" @@ -364,6 +365,12 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) + // Inject debugging subMetrics, such as D2HTime before SliceOnCpu + // The injected metrics will be serialized as the members of GpuPartitioning + partitioner match { + case pt: GpuPartitioning => pt.setupDebugMetrics(metrics) + case _ => + } val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) def getPartitioned: ColumnarBatch => Any = { batch => partitionTime.ns {