Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Heterogeneous environment #38

Merged
merged 2 commits into from
Apr 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,54 @@

package com.ibm.gpuenabler

import java.util.Date
import java.net.URL

import jcuda.Pointer
import jcuda.driver.JCudaDriver._
import jcuda.driver.{CUdeviceptr, CUmodule, JCudaDriver}
import jcuda.runtime.JCuda
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkException
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.mutable.HashMap
import scala.collection.mutable
import java.text.SimpleDateFormat

private[gpuenabler] object CUDAManagerCachedModule {
private val cachedModules = new HashMap[(String, Int), CUmodule]
private val cachedModules = new HashMap[(String, Int), CUmodule]
def getInstance() : HashMap[(String, Int), CUmodule] = { cachedModules }
}

private[gpuenabler] class CUDAManager {
// Initialization
// This is supposed to be called before ANY other JCuda* call to ensure we have properly loaded
// native jCuda library and cuda context
private[gpuenabler] var isGPUEnabled = false
try {
JCudaDriver.setExceptionsEnabled(true)
JCudaDriver.cuInit(0)
isGPUEnabled = true
} catch {
case ex: UnsatisfiedLinkError =>
throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " +
"not detected - make sure Driver and Executors are able to load them", ex)
case ex: UnsatisfiedLinkError =>
CUDAManager.logger.info("Could not initialize CUDA, because native jCuda libraries were " +
"not detected - continue to use CPU for execution")
case ex: NoClassDefFoundError =>
throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " +
"not detected - make sure Driver and Executors are able to load them", ex)

CUDAManager.logger.info("Could not initialize CUDA, because native jCuda libraries were " +
"not detected - continue to use CPU for execution")
case ex: Throwable =>
throw new SparkException("Could not initialize CUDA because of unknown reason", ex)
}

// Returns the number of GPUs available in this node
private[gpuenabler] def gpuCount = {
val count = new Array[Int](1)
cuDeviceGetCount(count)
count(0)
}

// private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = {
// TODO : change it back to private after development
def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = {
private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = {
var resourceURL: URL = null
var key: String = null
var ptxString: String = null
Expand All @@ -71,14 +80,14 @@ private[gpuenabler] class CUDAManager {

val devIx = new Array[Int](1)
JCuda.cudaGetDevice(devIx)

synchronized {
// Since multiple modules cannot be loaded into one context in runtime API,
// we use singleton cache http://stackoverflow.com/questions/32502375/
// loading-multiple-modules-in-jcuda-is-not-working
// TODO support loading multiple ptxs
// http://stackoverflow.com/questions/32535828/jit-in-jcuda-loading-multiple-ptx-modules
CUDAManagerCachedModule.getInstance.getOrElseUpdate((key, devIx(0)), {
// println(" MODULE LOAD ")
// TODO maybe unload the module if it won't be needed later
var moduleBinaryData: Array[Byte] = null
if (resourceURL != null) {
Expand All @@ -94,6 +103,7 @@ private[gpuenabler] class CUDAManager {
moduleBinaryData0(moduleBinaryData.length) = 0
val module = new CUmodule
JCudaDriver.cuModuleLoadData(module, moduleBinaryData0)
CUDAManagerCachedModule.getInstance.put((key, devIx(0)), module)
module
})
}
Expand Down
95 changes: 57 additions & 38 deletions gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,35 @@ private[gpuenabler] class MapGPUPartitionsRDD[U: ClassTag, T: ClassTag](
private val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[U]

override def compute(split: Partition, context: TaskContext): Iterator[U] = {
// Use the block ID of this particular (rdd, partition)
val blockId = RDDBlockId(this.id, split.index)
if (GPUSparkEnv.get.isGPUEnabled) {
// Use the block ID of this particular (rdd, partition)
val blockId = RDDBlockId(this.id, split.index)

// Handle empty partitions.
if (firstParent[T].iterator(split, context).length <= 0)
return new Array[U](0).toIterator
// Handle empty partitions.
if (firstParent[T].iterator(split, context).length <= 0)
return new Array[U](0).toIterator

val inputHyIter = firstParent[T].iterator(split, context) match {
case hyIter: HybridIterator[T] => {
hyIter
}
case iter: Iterator[T] => {
// println("Converting Regular Iterator to hybridIterator")
val parentBlockId = RDDBlockId(firstParent[T].id, split.index)
val hyIter = new HybridIterator[T](iter.toArray, inputColSchema,
kernel.inputColumnsOrder, Some(parentBlockId))
hyIter
val inputHyIter = firstParent[T].iterator(split, context) match {
case hyIter: HybridIterator[T] => {
hyIter
}
case iter: Iterator[T] => {
// println("Converting Regular Iterator to hybridIterator")
val parentBlockId = RDDBlockId(firstParent[T].id, split.index)
val hyIter = new HybridIterator[T](iter.toArray, inputColSchema,
kernel.inputColumnsOrder, Some(parentBlockId))
hyIter
}
}
}

val resultIter = kernel.compute[U, T](inputHyIter,
Seq(inputColSchema, outputColSchema), None,
outputArraySizes, inputFreeVariables, Some(blockId))
val resultIter = kernel.compute[U, T](inputHyIter,
Seq(inputColSchema, outputColSchema), None,
outputArraySizes, inputFreeVariables, Some(blockId))

resultIter
resultIter
} else {
f(context, split.index, firstParent[T].iterator(split, context))
}
}
}

Expand Down Expand Up @@ -315,25 +319,39 @@ object CUDARDDImplicits {

val cleanF = CUDAUtils.cleanFn(sc, f) // sc.clean(f)

val inputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T]
val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T]

val reducePartition: (TaskContext, Iterator[T]) => Option[T] =
(ctx: TaskContext, data: Iterator[T]) => {
data match {
case col: HybridIterator[T] =>
if (col.numElements != 0) {
val colIter = extfunc.compute[T, T](col, Seq(inputColSchema, outputColSchema),
Some(1), outputArraySizes,
inputFreeVariables, None).asInstanceOf[HybridIterator[T]]
Some(colIter.next)
} else {
None
}
// Handle partitions with no data
case _ => None
}
val reducePartition: (TaskContext, Iterator[T]) => Option[T] = (ctx: TaskContext, data: Iterator[T]) => {
if (GPUSparkEnv.get.isGPUEnabled) {
val inputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T]
val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T]

data match {
case col: HybridIterator[T] =>
if (col.numElements != 0) {
val colIter = extfunc.compute[T, T](col, Seq(inputColSchema, outputColSchema),
Some(1), outputArraySizes,
inputFreeVariables, None).asInstanceOf[HybridIterator[T]]
Some(colIter.next)
} else {
None
}
// Handle partitions with no data
case _ =>
if (data.hasNext) {
Some(data.reduceLeft(cleanF))
}
else {
None
}
}
} else {
if (data.hasNext) {
Some(data.reduceLeft(cleanF))
}
else {
None
}
}
}

var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
Expand All @@ -344,6 +362,7 @@ object CUDARDDImplicits {
}
}
}

sc.runJob(rdd, reducePartition, 0 until rdd.partitions.length, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
Expand Down
13 changes: 11 additions & 2 deletions gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.ibm.gpuenabler

import org.apache.spark.SparkEnv
import org.apache.spark.gpuenabler.CUDAUtils._

import jcuda.runtime.JCuda

private[gpuenabler] class GPUSparkEnv() {

Expand All @@ -45,7 +45,8 @@ private[gpuenabler] class GPUSparkEnv() {
new GPUMemoryManagerMasterEndPoint(rpcEnv)),
isDriver,
isLocal)
val isGPUEnabled = (cudaManager != null)
val isGPUEnabled = if (cudaManager != null) cudaManager.isGPUEnabled else false
def gpuCount = if (isGPUEnabled) cudaManager.gpuCount else 0
val isGPUCodeGenEnabled =
isGPUEnabled && SparkEnv.get.conf.getBoolean("spark.gpu.codegen", false)
}
Expand All @@ -65,6 +66,14 @@ private[gpuenabler] object GPUSparkEnv extends _Logging {
oldSparkEnv = SparkEnv.get
initalize()
}

if (env.isGPUEnabled) {
val executorId = env.executorId match {
case "driver" => 0
case _ => SparkEnv.get.executorId.toInt
}
JCuda.cudaSetDevice(executorId % env.gpuCount )
}
env
}
}
Expand Down
15 changes: 15 additions & 0 deletions gpu-enabler/src/test/resources/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CUDA_PATH ?= /usr
CXX ?= g++
NVCC ?= $(CUDA_PATH)/bin/nvcc
COMPUTE_CAPABILITY ?= 35
CXXFLAGS ?= -m64 -O3 -Xcompiler -Wall --std=c++11 -g
NVCCFLAGS ?= -arch=sm_$(COMPUTE_CAPABILITY) -Xptxas="-v"


all : testCUDAKernels.ptx

testCUDAKernels.ptx: testCUDAKernels.cu
$(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^

clean:
rm -f testCUDAKernels.ptx