Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk spill metric #11564

Merged
merged 10 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ private final class GpuSemaphore() extends Logging {
def completeTask(context: TaskContext): Unit = {
val taskAttemptId = context.taskAttemptId()
GpuTaskMetrics.get.updateRetry(taskAttemptId)
GpuTaskMetrics.get.updateMaxGpuMemory(taskAttemptId)
GpuTaskMetrics.get.updateMaxMemory(taskAttemptId)
val refs = tasks.remove(taskAttemptId)
if (refs == null) {
throw new IllegalStateException(s"Completion of unknown task $taskAttemptId")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,9 @@ import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta
import org.apache.commons.io.IOUtils

import org.apache.spark.TaskContext
import org.apache.spark.sql.rapids.{GpuTaskMetrics, RapidsDiskBlockManager}
import org.apache.spark.sql.rapids.execution.SerializedHostTableUtils
import org.apache.spark.sql.rapids.execution.{SerializedHostTableUtils, TrampolineUtil}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -38,6 +39,13 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
extends RapidsBufferStoreWithoutSpill(StorageTier.DISK) {
private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File]

private def reportDiskAllocMetrics(metrics: GpuTaskMetrics): String = {
val taskId = TaskContext.get().taskAttemptId()
val totalSize = metrics.getDiskBytesAllocated
val maxSize = metrics.getMaxDiskBytesAllocated
s"total size for task $taskId is $totalSize, max size is $maxSize"
}

override protected def createBuffer(
incoming: RapidsBuffer,
catalog: RapidsBufferCatalog,
Expand All @@ -58,7 +66,6 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
} else {
writeToFile(incoming, path, append = false, stream)
}

logDebug(s"Spilled to $path $fileOffset:$diskLength")
val buff = incoming match {
case _: RapidsHostBatchBuffer =>
Expand All @@ -79,6 +86,12 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
incoming.meta,
incoming.getSpillPriority)
}
TrampolineUtil.incTaskMetricsDiskBytesSpilled(uncompressedSize)

val metrics = GpuTaskMetrics.get
metrics.incDiskBytesAllocated(uncompressedSize)
logDebug(s"acquiring resources for disk buffer $id of size $uncompressedSize bytes")
logDebug(reportDiskAllocMetrics(metrics))
Some(buff)
}

Expand Down Expand Up @@ -181,6 +194,11 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
}

override protected def releaseResources(): Unit = {
logDebug(s"releasing resources for disk buffer $id of size $memoryUsedBytes bytes")
val metrics = GpuTaskMetrics.get
metrics.decDiskBytesAllocated(memoryUsedBytes)
logDebug(reportDiskAllocMetrics(metrics))

// Buffers that share paths must be cleaned up elsewhere
if (id.canShareDiskPaths) {
sharedBufferFiles.remove(id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,8 @@ import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.TaskContext
import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.storage.RapidsStorageUtils
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -99,8 +99,8 @@ class RapidsHostMemoryStore(
} else {
val amountSpilled = synchronousSpill(targetTotalSize, catalog, stream)
if (amountSpilled != 0) {
logDebug(s"Spilled $amountSpilled bytes from ${name} to make room for ${buffer.id}")
TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled)
logDebug(s"Task ${TaskContext.get.taskAttemptId()} spilled $amountSpilled bytes from" +
s"${name} to make room for ${buffer.id}")
}
// if after spill we can fit the new buffer, return true
buffer.memoryUsedBytes <= (ms - currentSize)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -121,6 +121,26 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator
private val maxDiskMemoryBytes = new HighWatermarkAccumulator

private var diskBytesAllocated: Long = 0
private var maxDiskBytesAllocated: Long = 0

def getDiskBytesAllocated: Long = diskBytesAllocated

def getMaxDiskBytesAllocated: Long = maxDiskBytesAllocated

def incDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated += bytes
maxDiskBytesAllocated = maxDiskBytesAllocated.max(diskBytesAllocated)
}

def decDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated -= bytes
// For some reason it's possible for the task to start out by releasing resources,
// possibly from a previous task, in such case we probably should just ignore it.
diskBytesAllocated = diskBytesAllocated.max(0)
}

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuSemaphoreWait" -> semWaitTimeNs,
Expand All @@ -132,7 +152,8 @@ class GpuTaskMetrics extends Serializable {
"gpuSpillToDiskTime" -> spillToDiskTimeNs,
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes,
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes
)

def register(sc: SparkContext): Unit = {
Expand Down Expand Up @@ -211,16 +232,19 @@ class GpuTaskMetrics extends Serializable {
}
}

def updateMaxGpuMemory(taskAttemptId: Long): Unit = {
def updateMaxMemory(taskAttemptId: Long): Unit = {
val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId)
if (maxMem > 0) {
// This metric tracks the max amount of memory that is allocated on the gpu during
// the lifespan of a task. However, this update function only gets called once on task
// completion, whereas the actual logic tracking of the max value during memory allocations
// lives in the JNI. Therefore, we can stick the convention here of calling the add method
// instead of adding a dedicated max method to the accumulator.
// These metrics track the max amount of memory that is allocated on the gpu and disk,
// respectively, during the lifespan of a task. However, this update function only gets called
// once on task completion, whereas the actual logic tracking of the max value during memory
// allocations lives in the JNI. Therefore, we can stick the convention here of calling the
// add method instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
if (maxDiskBytesAllocated > 0) {
maxDiskMemoryBytes.add(maxDiskBytesAllocated)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ object TrampolineUtil {
* @param amountSpilled amount of memory spilled in bytes
*/
def incTaskMetricsDiskBytesSpilled(amountSpilled: Long): Unit = {
Option(TaskContext.get).foreach(_.taskMetrics().incDiskBytesSpilled(amountSpilled))
Option(TaskContext.get).foreach(tc => {
val metrics = tc.taskMetrics()
if (metrics != null) {
metrics.incDiskBytesSpilled(amountSpilled)
}
})
}

/**
Expand Down