-
Notifications
You must be signed in to change notification settings - Fork 240
/
GpuDeviceManager.scala
351 lines (315 loc) · 14.4 KB
/
GpuDeviceManager.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids
import java.util.concurrent.ThreadFactory
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import ai.rapids.cudf._
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.sql.rapids.GpuShuffleEnv
sealed trait MemoryState
private case object Initialized extends MemoryState
private case object Uninitialized extends MemoryState
private case object Errored extends MemoryState
object GpuDeviceManager extends Logging {
// This config controls whether RMM/Pinned memory are initialized from the task
// or from the executor side plugin. The default is to initialize from the
// executor plugin.
// var for testing purposes only!
var rmmTaskInitEnabled = {
java.lang.Boolean.getBoolean("com.nvidia.spark.rapids.memory.gpu.rmm.init.task")
}
// for testing only
def setRmmTaskInitEnabled(enabled: Boolean): Unit = {
rmmTaskInitEnabled = enabled
}
private val threadGpuInitialized = new ThreadLocal[Boolean]()
@volatile private var singletonMemoryInitialized: MemoryState = Uninitialized
@volatile private var deviceId: Option[Int] = None
/**
* Exposes the device id used while initializing the RMM pool
*/
def getDeviceId(): Option[Int] = deviceId
// Attempt to set and acquire the gpu, return true if acquired, false otherwise
def tryToSetGpuDeviceAndAcquire(addr: Int): Boolean = {
try {
GpuDeviceManager.setGpuDeviceAndAcquire(addr)
} catch {
case NonFatal(e) =>
logInfo(s"Will not use GPU $addr because of $e")
// we may have lost a race trying to acquire this addr or GPU is already busy
return false
}
return true
}
/**
* This is just being extra paranoid when we are running on GPUs in exclusive mode. It is not
* clear exactly what cuda interactions cause us to acquire a device, so this will force us
* to do an interaction we know will acquire the device.
*/
private def findGpuAndAcquire(): Int = {
val deviceCount: Int = Cuda.getDeviceCount()
// loop multiple times to see if a GPU was released or something unexpected happened that
// we couldn't acquire on first try
var numRetries = 2
val addrsToTry = ArrayBuffer.empty ++= (0 until deviceCount)
while (numRetries > 0) {
val addr = addrsToTry.find(tryToSetGpuDeviceAndAcquire)
if (addr.isDefined) {
return addr.get
}
numRetries -= 1
}
throw new IllegalStateException("Could not find a single GPU to use")
}
def setGpuDeviceAndAcquire(addr: Int): Int = {
logDebug(s"Initializing GPU device ID to $addr")
Cuda.setDevice(addr.toInt)
// cudaFree(0) to actually allocate the set device - no process exclusive required
// since we are relying on Spark to schedule it properly and not give it to multiple
// executors
Cuda.freeZero()
addr
}
def getGPUAddrFromResources(resources: Map[String, ResourceInformation],
conf: RapidsConf): Option[Int] = {
val sparkGpuResourceName = conf.getSparkGpuResourceName
if (resources.contains(sparkGpuResourceName)) {
logDebug(s"Spark resources contain: $sparkGpuResourceName")
val addrs = resources(sparkGpuResourceName).addresses
if (addrs.length > 1) {
// Throw an exception since we assume one GPU per executor.
// If multiple GPUs are allocated by spark, then different tasks could get assigned
// different GPUs but RMM would only be initialized for 1. We could also just get
// weird results that are hard to debug.
throw new IllegalArgumentException("Spark GPU Plugin only supports 1 gpu per executor")
}
Some(addrs.head.toInt)
} else {
None
}
}
// Initializes the GPU if Spark assigned one.
// Returns either the GPU addr Spark assigned or None if Spark didn't assign one.
def initializeGpu(resources: Map[String, ResourceInformation], conf: RapidsConf): Option[Int] = {
getGPUAddrFromResources(resources, conf).map(setGpuDeviceAndAcquire(_))
}
def initializeGpuAndMemory(resources: Map[String, ResourceInformation],
conf: RapidsConf): Unit = {
// as long in execute mode initialize everything because we could enable it after startup
if (conf.isSqlExecuteOnGPU) {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
// uses that GPU. We only need to initialize RMM once per Executor because we are relying on
// only 1 GPU per executor.
// If Spark didn't provide the address we just use the default GPU.
val addr = initializeGpu(resources, conf)
initializeMemory(addr)
}
}
def shutdown(): Unit = synchronized {
// assume error during shutdown until we complete it
singletonMemoryInitialized = Errored
RapidsBufferCatalog.close()
GpuShuffleEnv.shutdown()
Rmm.shutdown()
singletonMemoryInitialized = Uninitialized
}
def getResourcesFromTaskContext: Map[String, ResourceInformation] = {
val tc = TaskContext.get()
if (tc == null) Map.empty[String, ResourceInformation] else tc.resources()
}
/**
* Always set the GPU if it was assigned by Spark and initialize the RMM if its configured
* to do so in the task.
* We expect the plugin to be run with 1 task and 1 GPU per executor.
*/
def initializeFromTask(): Unit = {
if (threadGpuInitialized.get() == false) {
val resources = getResourcesFromTaskContext
val conf = new RapidsConf(SparkEnv.get.conf)
if (rmmTaskInitEnabled) {
initializeGpuAndMemory(resources, conf)
} else {
// just set the device if provided so task thread uses right GPU
initializeGpu(resources, conf)
}
threadGpuInitialized.set(true)
}
}
private def toMB(x: Long): Double = x / 1024 / 1024.0
private def computeRmmPoolSize(conf: RapidsConf, info: CudaMemInfo): Long = {
def truncateToAlignment(x: Long): Long = x & ~511L
// No checks when rmmExactAlloc is given. We are just going to go with the amount requested
// with the proper alignment (which is a requirement for some allocators). This is because
// it is for testing and we assume that the tests know what they are doing. If the conf becomes
// public, then we need to do some more work.
conf.rmmExactAlloc.map(truncateToAlignment).getOrElse {
val minAllocation = truncateToAlignment((conf.rmmAllocMinFraction * info.total).toLong)
val maxAllocation = truncateToAlignment((conf.rmmAllocMaxFraction * info.total).toLong)
val reserveAmount = if (GpuShuffleEnv.shouldUseRapidsShuffle(conf)
&& conf.rmmPool.equalsIgnoreCase("ASYNC")) {
// When using the async allocator, UCX calls `cudaMalloc` directly to allocate the
// bounce buffers.
conf.rmmAllocReserve + conf.shuffleUcxBounceBuffersSize * 2
} else {
conf.rmmAllocReserve
}
var poolAllocation = truncateToAlignment(
(conf.rmmAllocFraction * (info.free - reserveAmount)).toLong)
if (poolAllocation < minAllocation) {
throw new IllegalArgumentException(s"The pool allocation of " +
s"${toMB(poolAllocation)} MB (calculated from ${RapidsConf.RMM_ALLOC_FRACTION} " +
s"(=${conf.rmmAllocFraction}) and ${toMB(info.free)} MB free memory) was less than " +
s"the minimum allocation of ${toMB(minAllocation)} (calculated from " +
s"${RapidsConf.RMM_ALLOC_MIN_FRACTION} (=${conf.rmmAllocMinFraction}) " +
s"and ${toMB(info.total)} MB total memory)")
}
if (maxAllocation < poolAllocation) {
throw new IllegalArgumentException(s"The pool allocation of " +
s"${toMB(poolAllocation)} MB (calculated from ${RapidsConf.RMM_ALLOC_FRACTION} " +
s"(=${conf.rmmAllocFraction}) and ${toMB(info.free)} MB free memory) was more than " +
s"the maximum allocation of ${toMB(maxAllocation)} (calculated from " +
s"${RapidsConf.RMM_ALLOC_MAX_FRACTION} (=${conf.rmmAllocMaxFraction}) " +
s"and ${toMB(info.total)} MB total memory)")
}
if (reserveAmount >= maxAllocation) {
throw new IllegalArgumentException(s"RMM reserve memory (${toMB(reserveAmount)} MB) " +
s"larger than maximum pool size (${toMB(maxAllocation)} MB). Check the settings for " +
s"${RapidsConf.RMM_ALLOC_MAX_FRACTION} (=${conf.rmmAllocFraction}) and " +
s"${RapidsConf.RMM_ALLOC_RESERVE} (=$reserveAmount)")
}
val adjustedMaxAllocation = truncateToAlignment(maxAllocation - reserveAmount)
if (poolAllocation > adjustedMaxAllocation) {
logWarning(s"RMM pool allocation (${toMB(poolAllocation)} MB) does not leave enough free " +
s"memory for reserve memory (${toMB(reserveAmount)} MB), lowering the pool size to " +
s"${toMB(adjustedMaxAllocation)} MB to accommodate the requested reserve amount.")
poolAllocation = adjustedMaxAllocation
}
poolAllocation
}
}
private def initializeRmm(gpuId: Int, rapidsConf: Option[RapidsConf] = None): Unit = {
if (!Rmm.isInitialized) {
val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf))
val info = Cuda.memGetInfo()
// We need to reserve more memory when RAPIDS shuffle is enabled and we are using the CUDA
// async allocator, so initialize the shuffle environment first.
GpuShuffleEnv.init(conf)
val poolAllocation = computeRmmPoolSize(conf, info)
var init = RmmAllocationMode.CUDA_DEFAULT
val features = ArrayBuffer[String]()
if (conf.isPooledMemEnabled) {
init = conf.rmmPool match {
case c if "default".equalsIgnoreCase(c) =>
if (Cuda.isPtdsEnabled) {
logWarning("Configuring the DEFAULT allocator with a CUDF built for " +
"Per-Thread Default Stream (PTDS). This is known to be unstable! " +
"We recommend you use the ARENA allocator when PTDS is enabled.")
}
features += "POOLED"
init | RmmAllocationMode.POOL
case c if "arena".equalsIgnoreCase(c) =>
features += "ARENA"
init | RmmAllocationMode.ARENA
case c if "async".equalsIgnoreCase(c) =>
features += "ASYNC"
init | RmmAllocationMode.CUDA_ASYNC
case c if "none".equalsIgnoreCase(c) =>
// Pooling is disabled.
init
case c =>
throw new IllegalArgumentException(s"RMM pool set to '$c' is not supported.")
}
} else if (!"none".equalsIgnoreCase(conf.rmmPool)) {
logWarning("RMM pool is disabled since spark.rapids.memory.gpu.pooling.enabled is set " +
"to false; however, this configuration is deprecated and the behavior may change in a " +
"future release.")
}
if (conf.isUvmEnabled) {
init = init | RmmAllocationMode.CUDA_MANAGED_MEMORY
features += "UVM"
}
val logConf: Rmm.LogConf = conf.rmmDebugLocation match {
case c if "none".equalsIgnoreCase(c) => null
case c if "stdout".equalsIgnoreCase(c) =>
features += "LOG: STDOUT"
Rmm.logToStdout()
case c if "stderr".equalsIgnoreCase(c) =>
features += "LOG: STDERR"
Rmm.logToStdout()
case c =>
logWarning(s"RMM logging set to '$c' is not supported and is being ignored.")
null
}
deviceId = Some(gpuId)
logInfo(s"Initializing RMM${features.mkString(" ", " ", "")} " +
s"pool size = ${toMB(poolAllocation)} MB on gpuId $gpuId")
if (Cuda.isPtdsEnabled()) {
logInfo("Using per-thread default stream")
} else {
logInfo("Using legacy default stream")
}
Cuda.setDevice(gpuId)
Rmm.initialize(init, logConf, poolAllocation)
RapidsBufferCatalog.init(conf)
}
}
private def allocatePinnedMemory(gpuId: Int, rapidsConf: Option[RapidsConf] = None): Unit = {
val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf))
if (!PinnedMemoryPool.isInitialized && conf.pinnedPoolSize > 0) {
logInfo(s"Initializing pinned memory pool (${conf.pinnedPoolSize / 1024 / 1024.0} MB)")
PinnedMemoryPool.initialize(conf.pinnedPoolSize, gpuId)
}
}
/**
* Initialize the GPU memory for gpuId according to the settings in rapidsConf. It is assumed
* that if gpuId is set then that gpu is already the default device. If gpuId is not set
* this will search all available GPUs starting at 0 looking for the appropriate one.
* @param gpuId the id of the gpu to use
* @param rapidsConf the config to use.
*/
def initializeMemory(gpuId: Option[Int], rapidsConf: Option[RapidsConf] = None): Unit = {
if (singletonMemoryInitialized != Initialized) {
// Memory or memory related components that only need to be initialized once per executor.
// This synchronize prevents multiple tasks from trying to initialize these at the same time.
GpuDeviceManager.synchronized {
if (singletonMemoryInitialized == Errored) {
throw new IllegalStateException(
"Cannot initialize memory due to previous shutdown failing")
} else if (singletonMemoryInitialized == Uninitialized) {
val gpu = gpuId.getOrElse(findGpuAndAcquire())
initializeRmm(gpu, rapidsConf)
allocatePinnedMemory(gpu, rapidsConf)
singletonMemoryInitialized = Initialized
}
}
}
}
/** Wrap a thread factory with one that will set the GPU device on each thread created. */
def wrapThreadFactory(factory: ThreadFactory): ThreadFactory = new ThreadFactory() {
private[this] val devId = getDeviceId.getOrElse {
throw new IllegalStateException("Device ID is not set")
}
override def newThread(runnable: Runnable): Thread = {
factory.newThread(() => {
Cuda.setDevice(devId)
runnable.run()
})
}
}
}