Skip to content

Commit

Permalink
Unshim ExclusiveModeGpuDiscoveryPlugin (#3590)
Browse files Browse the repository at this point in the history
* Unshim ExclusiveModeGpuDiscoveryPlugin

As one of the publicly documented classes loaded by Spark outside the
plugin control, ExclusiveModeGpuDiscoveryPlugin may not reside in the
Parallel World locations. Binary dedupe shows
ExclusiveModeGpuDiscoveryPlugin is bitwise-identical across all shims

Signed-off-by: Gera Shegalov <[email protected]>

* Delay ExclusiveModeGpuDiscoveryPlugin entry point

via ShimLoader

Signed-off-by: Gera Shegalov <[email protected]>

* Fix scalastyle
  • Loading branch information
gerashegalov authored Sep 22, 2021
1 parent fd4c9bc commit f4465cb
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 46 deletions.
5 changes: 3 additions & 2 deletions dist/unshimmed-common-from-spark301.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ META-INF/DEPENDENCIES
META-INF/LICENSE
META-INF/NOTICE
META-INF/maven/**
com/nvidia/spark/ExclusiveModeGpuDiscoveryPlugin*
com/nvidia/spark/RapidsUDF*
com/nvidia/spark/SQLPlugin*
com/nvidia/spark/rapids/ExecutionPlanCaptureCallback*
com/nvidia/spark/rapids/PlanUtils*
com/nvidia/spark/rapids/GpuKryoRegistrator*
com/nvidia/spark/rapids/PlanUtils*
com/nvidia/spark/rapids/RapidsExecutorHeartbeatMsg*
com/nvidia/spark/rapids/RapidsExecutorStartupMsg*
com/nvidia/spark/rapids/RapidsExecutorUpdateMsg*
Expand All @@ -23,4 +24,4 @@ org/apache/spark/sql/rapids/ProxyRapidsShuffleInternalManagerBase*
org/apache/spark/sql/rapids/VisibleShuffleManager*
org/openucx/**
rapids/*.py
rapids4spark-version-info.properties
rapids4spark-version-info.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,14 +18,10 @@ package com.nvidia.spark

import java.util.Optional

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Cuda
import com.nvidia.spark.rapids.GpuDeviceManager
import com.nvidia.spark.rapids.ShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ResourceInformation, ResourceRequest}

/**
Expand All @@ -39,44 +35,12 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequest}
* This plugin can be activated in spark with the configuration:
* `--conf spark.resources.discoveryPlugin=com.nvidia.spark.ExclusiveModeGpuDiscoveryPlugin`
*/
class ExclusiveModeGpuDiscoveryPlugin extends ResourceDiscoveryPlugin with Logging {
class ExclusiveModeGpuDiscoveryPlugin extends ResourceDiscoveryPlugin with Proxy {
override def discoverResource(
request: ResourceRequest,
sparkconf: SparkConf): Optional[ResourceInformation] = {
request: ResourceRequest,
sparkConf: SparkConf
): Optional[ResourceInformation] = self.discoverResource(request, sparkConf)

val resourceName = request.id.resourceName
if (!resourceName.equals("gpu")) {
logInfo("ExclusiveModeGpuDiscoveryPlugin only handles gpu allocations, " +
s"skipping $resourceName")
return Optional.empty()
}
val ngpusRequested = request.amount
val deviceCount: Int = Cuda.getDeviceCount
logInfo(s"Running ExclusiveModeGpuDiscoveryPlugin to acquire $ngpusRequested GPU(s), " +
s"host has $deviceCount GPU(s)")
// loop multiple times to see if a GPU was released or something unexpected happened that
// we couldn't acquire on first try
var numRetries = 2
val allocatedAddrs = ArrayBuffer[String]()
val addrsToTry = ArrayBuffer.empty ++= (0 until deviceCount)
while (numRetries > 0 && allocatedAddrs.size < ngpusRequested && addrsToTry.nonEmpty) {
var addrLoc = 0
val allAddrs = addrsToTry.size
while (addrLoc < allAddrs && allocatedAddrs.size < ngpusRequested) {
val addr = addrsToTry(addrLoc)
if (GpuDeviceManager.tryToSetGpuDeviceAndAcquire(addr)) {
allocatedAddrs += addr.toString
}
addrLoc += 1
}
addrsToTry --= allocatedAddrs.map(_.toInt)
numRetries -= 1
}
if (allocatedAddrs.size < ngpusRequested) {
// log warning here, Spark will throw exception if we return not enough
logWarning(s"ExclusiveModeGpuDiscoveryPlugin did not find enough gpus, " +
s"requested: $ngpusRequested found: ${allocatedAddrs.size}")
}
Optional.of(new ResourceInformation("gpu", allocatedAddrs.toArray))
}
override lazy val self: ResourceDiscoveryPlugin =
ShimLoader.newInternalExclusiveModeGpuDiscoveryPlugin()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.util.Optional

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Cuda

import org.apache.spark.SparkConf
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ResourceInformation, ResourceRequest}

class InternalExclusiveModeGpuDiscoveryPlugin extends ResourceDiscoveryPlugin with Logging {
override def discoverResource(
request: ResourceRequest,
sparkconf: SparkConf
): Optional[ResourceInformation] = {

val resourceName = request.id.resourceName
if (!resourceName.equals("gpu")) {
logInfo("ExclusiveModeGpuDiscoveryPlugin only handles gpu allocations, " +
s"skipping $resourceName")
return Optional.empty()
}
val ngpusRequested = request.amount
val deviceCount: Int = Cuda.getDeviceCount
logInfo(s"Running ExclusiveModeGpuDiscoveryPlugin to acquire $ngpusRequested GPU(s), " +
s"host has $deviceCount GPU(s)")
// loop multiple times to see if a GPU was released or something unexpected happened that
// we couldn't acquire on first try
var numRetries = 2
val allocatedAddrs = ArrayBuffer[String]()
val addrsToTry = ArrayBuffer.empty ++= (0 until deviceCount)
while (numRetries > 0 && allocatedAddrs.size < ngpusRequested && addrsToTry.nonEmpty) {
var addrLoc = 0
val allAddrs = addrsToTry.size
while (addrLoc < allAddrs && allocatedAddrs.size < ngpusRequested) {
val addr = addrsToTry(addrLoc)
if (GpuDeviceManager.tryToSetGpuDeviceAndAcquire(addr)) {
allocatedAddrs += addr.toString
}
addrLoc += 1
}
addrsToTry --= allocatedAddrs.map(_.toInt)
numRetries -= 1
}
if (allocatedAddrs.size < ngpusRequested) {
// log warning here, Spark will throw exception if we return not enough
logWarning(s"ExclusiveModeGpuDiscoveryPlugin did not find enough gpus, " +
s"requested: $ngpusRequested found: ${allocatedAddrs.size}")
}
Optional.of(new ResourceInformation("gpu", allocatedAddrs.toArray))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.{SPARK_BUILD_USER, SPARK_VERSION, SparkConf, SparkEnv}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin}
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -339,4 +340,7 @@ object ShimLoader extends Logging {
newInstanceOf("com.nvidia.spark.udf.LogicalPlanRules")
}

def newInternalExclusiveModeGpuDiscoveryPlugin(): ResourceDiscoveryPlugin = {
newInstanceOf("com.nvidia.spark.rapids.InternalExclusiveModeGpuDiscoveryPlugin")
}
}

0 comments on commit f4465cb

Please sign in to comment.