diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala index e2ec120..c6537c6 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala @@ -17,8 +17,8 @@ package com.ibm.gpuenabler +import java.util.Date import java.net.URL - import jcuda.Pointer import jcuda.driver.JCudaDriver._ import jcuda.driver.{CUdeviceptr, CUmodule, JCudaDriver} @@ -26,11 +26,12 @@ import jcuda.runtime.JCuda import org.apache.commons.io.IOUtils import org.apache.spark.SparkException import org.slf4j.{Logger, LoggerFactory} - import scala.collection.mutable.HashMap +import scala.collection.mutable +import java.text.SimpleDateFormat private[gpuenabler] object CUDAManagerCachedModule { - private val cachedModules = new HashMap[(String, Int), CUmodule] + private val cachedModules = new HashMap[(String, Int), CUmodule] def getInstance() : HashMap[(String, Int), CUmodule] = { cachedModules } } @@ -38,24 +39,32 @@ private[gpuenabler] class CUDAManager { // Initialization // This is supposed to be called before ANY other JCuda* call to ensure we have properly loaded // native jCuda library and cuda context + private[gpuenabler] var isGPUEnabled = false try { JCudaDriver.setExceptionsEnabled(true) JCudaDriver.cuInit(0) + isGPUEnabled = true } catch { - case ex: UnsatisfiedLinkError => - throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " + - "not detected - make sure Driver and Executors are able to load them", ex) + case ex: UnsatisfiedLinkError => + CUDAManager.logger.info("Could not initialize CUDA, because native jCuda libraries were " + + "not detected - continue to use CPU for execution") case ex: NoClassDefFoundError => - throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " + - "not detected - make sure Driver and Executors are able to load them", ex) - + CUDAManager.logger.info("Could not initialize CUDA, because native jCuda libraries were " + + "not detected - continue to use CPU for execution") case ex: Throwable => throw new SparkException("Could not initialize CUDA because of unknown reason", ex) } + // Returns the number of GPUs available in this node + private[gpuenabler] def gpuCount = { + val count = new Array[Int](1) + cuDeviceGetCount(count) + count(0) + } + // private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { // TODO : change it back to private after development - def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { + private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { var resourceURL: URL = null var key: String = null var ptxString: String = null @@ -71,6 +80,7 @@ private[gpuenabler] class CUDAManager { val devIx = new Array[Int](1) JCuda.cudaGetDevice(devIx) + synchronized { // Since multiple modules cannot be loaded into one context in runtime API, // we use singleton cache http://stackoverflow.com/questions/32502375/ @@ -78,7 +88,6 @@ private[gpuenabler] class CUDAManager { // TODO support loading multiple ptxs // http://stackoverflow.com/questions/32535828/jit-in-jcuda-loading-multiple-ptx-modules CUDAManagerCachedModule.getInstance.getOrElseUpdate((key, devIx(0)), { - // println(" MODULE LOAD ") // TODO maybe unload the module if it won't be needed later var moduleBinaryData: Array[Byte] = null if (resourceURL != null) { @@ -94,6 +103,7 @@ private[gpuenabler] class CUDAManager { moduleBinaryData0(moduleBinaryData.length) = 0 val module = new CUmodule JCudaDriver.cuModuleLoadData(module, moduleBinaryData0) + CUDAManagerCachedModule.getInstance.put((key, devIx(0)), module) module }) } diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala index 01e8215..ec6e496 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala @@ -85,31 +85,35 @@ private[gpuenabler] class MapGPUPartitionsRDD[U: ClassTag, T: ClassTag]( private val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[U] override def compute(split: Partition, context: TaskContext): Iterator[U] = { - // Use the block ID of this particular (rdd, partition) - val blockId = RDDBlockId(this.id, split.index) + if (GPUSparkEnv.get.isGPUEnabled) { + // Use the block ID of this particular (rdd, partition) + val blockId = RDDBlockId(this.id, split.index) - // Handle empty partitions. - if (firstParent[T].iterator(split, context).length <= 0) - return new Array[U](0).toIterator + // Handle empty partitions. + if (firstParent[T].iterator(split, context).length <= 0) + return new Array[U](0).toIterator - val inputHyIter = firstParent[T].iterator(split, context) match { - case hyIter: HybridIterator[T] => { - hyIter - } - case iter: Iterator[T] => { - // println("Converting Regular Iterator to hybridIterator") - val parentBlockId = RDDBlockId(firstParent[T].id, split.index) - val hyIter = new HybridIterator[T](iter.toArray, inputColSchema, - kernel.inputColumnsOrder, Some(parentBlockId)) - hyIter + val inputHyIter = firstParent[T].iterator(split, context) match { + case hyIter: HybridIterator[T] => { + hyIter + } + case iter: Iterator[T] => { + // println("Converting Regular Iterator to hybridIterator") + val parentBlockId = RDDBlockId(firstParent[T].id, split.index) + val hyIter = new HybridIterator[T](iter.toArray, inputColSchema, + kernel.inputColumnsOrder, Some(parentBlockId)) + hyIter + } } - } - val resultIter = kernel.compute[U, T](inputHyIter, - Seq(inputColSchema, outputColSchema), None, - outputArraySizes, inputFreeVariables, Some(blockId)) + val resultIter = kernel.compute[U, T](inputHyIter, + Seq(inputColSchema, outputColSchema), None, + outputArraySizes, inputFreeVariables, Some(blockId)) - resultIter + resultIter + } else { + f(context, split.index, firstParent[T].iterator(split, context)) + } } } @@ -315,25 +319,39 @@ object CUDARDDImplicits { val cleanF = CUDAUtils.cleanFn(sc, f) // sc.clean(f) - val inputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] - val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] - - val reducePartition: (TaskContext, Iterator[T]) => Option[T] = - (ctx: TaskContext, data: Iterator[T]) => { - data match { - case col: HybridIterator[T] => - if (col.numElements != 0) { - val colIter = extfunc.compute[T, T](col, Seq(inputColSchema, outputColSchema), - Some(1), outputArraySizes, - inputFreeVariables, None).asInstanceOf[HybridIterator[T]] - Some(colIter.next) - } else { - None - } - // Handle partitions with no data - case _ => None - } + val reducePartition: (TaskContext, Iterator[T]) => Option[T] = (ctx: TaskContext, data: Iterator[T]) => { + if (GPUSparkEnv.get.isGPUEnabled) { + val inputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] + val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] + + data match { + case col: HybridIterator[T] => + if (col.numElements != 0) { + val colIter = extfunc.compute[T, T](col, Seq(inputColSchema, outputColSchema), + Some(1), outputArraySizes, + inputFreeVariables, None).asInstanceOf[HybridIterator[T]] + Some(colIter.next) + } else { + None + } + // Handle partitions with no data + case _ => + if (data.hasNext) { + Some(data.reduceLeft(cleanF)) + } + else { + None + } + } + } else { + if (data.hasNext) { + Some(data.reduceLeft(cleanF)) + } + else { + None + } } + } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { @@ -344,6 +362,7 @@ object CUDARDDImplicits { } } } + sc.runJob(rdd, reducePartition, 0 until rdd.partitions.length, mergeResult) jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala index 8e27ff5..3b89737 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala @@ -19,7 +19,7 @@ package com.ibm.gpuenabler import org.apache.spark.SparkEnv import org.apache.spark.gpuenabler.CUDAUtils._ - +import jcuda.runtime.JCuda private[gpuenabler] class GPUSparkEnv() { @@ -45,7 +45,8 @@ private[gpuenabler] class GPUSparkEnv() { new GPUMemoryManagerMasterEndPoint(rpcEnv)), isDriver, isLocal) - val isGPUEnabled = (cudaManager != null) + val isGPUEnabled = if (cudaManager != null) cudaManager.isGPUEnabled else false + def gpuCount = if (isGPUEnabled) cudaManager.gpuCount else 0 val isGPUCodeGenEnabled = isGPUEnabled && SparkEnv.get.conf.getBoolean("spark.gpu.codegen", false) } @@ -65,6 +66,14 @@ private[gpuenabler] object GPUSparkEnv extends _Logging { oldSparkEnv = SparkEnv.get initalize() } + + if (env.isGPUEnabled) { + val executorId = env.executorId match { + case "driver" => 0 + case _ => SparkEnv.get.executorId.toInt + } + JCuda.cudaSetDevice(executorId % env.gpuCount ) + } env } } diff --git a/gpu-enabler/src/test/resources/Makefile b/gpu-enabler/src/test/resources/Makefile new file mode 100644 index 0000000..4bdfad5 --- /dev/null +++ b/gpu-enabler/src/test/resources/Makefile @@ -0,0 +1,15 @@ +CUDA_PATH ?= /usr +CXX ?= g++ +NVCC ?= $(CUDA_PATH)/bin/nvcc +COMPUTE_CAPABILITY ?= 35 +CXXFLAGS ?= -m64 -O3 -Xcompiler -Wall --std=c++11 -g +NVCCFLAGS ?= -arch=sm_$(COMPUTE_CAPABILITY) -Xptxas="-v" + + +all : testCUDAKernels.ptx + +testCUDAKernels.ptx: testCUDAKernels.cu + $(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^ + +clean: + rm -f testCUDAKernels.ptx