From c9ac3a428532db215da1b6a009ae679dfea12b96 Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Mon, 30 Jan 2017 05:18:01 -0600 Subject: [PATCH 1/2] multi gpu support --- .../com/ibm/gpuenabler/CUDAManager.scala | 51 +++++++++- .../com/ibm/gpuenabler/CUDARDDUtils.scala | 95 +++++++++++-------- .../com/ibm/gpuenabler/GPUSparkEnv.scala | 13 ++- gpu-enabler/src/test/resources/Makefile | 15 +++ 4 files changed, 131 insertions(+), 43 deletions(-) create mode 100644 gpu-enabler/src/test/resources/Makefile diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala index e2ec120..844c2ab 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala @@ -17,8 +17,9 @@ package com.ibm.gpuenabler +import java.util.Date import java.net.URL - +import java.util.concurrent.ConcurrentHashMap import jcuda.Pointer import jcuda.driver.JCudaDriver._ import jcuda.driver.{CUdeviceptr, CUmodule, JCudaDriver} @@ -26,11 +27,21 @@ import jcuda.runtime.JCuda import org.apache.commons.io.IOUtils import org.apache.spark.SparkException import org.slf4j.{Logger, LoggerFactory} - +import java.lang.management.ManagementFactory import scala.collection.mutable.HashMap +import scala.collection.mutable +import java.text.SimpleDateFormat +/* private[gpuenabler] object CUDAManagerCachedModule { - private val cachedModules = new HashMap[(String, Int), CUmodule] + private val cachedModules = new ConcurrentHashMap[(String, Int), CUmodule] + def getInstance() : ConcurrentHashMap[(String, Int), CUmodule] = { cachedModules } +} +*/ + +private[gpuenabler] object CUDAManagerCachedModule { +// val lock:Object = new Object + private val cachedModules = new HashMap[(String, Int), CUmodule] def getInstance() : HashMap[(String, Int), CUmodule] = { cachedModules } } @@ -38,21 +49,46 @@ private[gpuenabler] class CUDAManager { // Initialization // This is supposed to be called before ANY other JCuda* call to ensure we have properly loaded // native jCuda library and cuda context +<<<<<<< HEAD try { JCudaDriver.setExceptionsEnabled(true) JCudaDriver.cuInit(0) +======= + val executorId = SparkEnv.get.executorId match { + case "driver" => 0 + case _ => SparkEnv.get.executorId.toInt + } + val dateFormatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS") + + var isGPUEnabled = false + + try { + JCudaDriver.setExceptionsEnabled(true) + JCudaDriver.cuInit(0) + isGPUEnabled = true +>>>>>>> 1826bb2... handle nodes with no GPU attached } catch { + case ex: UnsatisfiedLinkError => println("Native CUDA libraries not detected.") + case ex: NoClassDefFoundError => println("Native CUDA libraries not detected.") +/* case ex: UnsatisfiedLinkError => throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " + "not detected - make sure Driver and Executors are able to load them", ex) case ex: NoClassDefFoundError => throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " + "not detected - make sure Driver and Executors are able to load them", ex) +*/ case ex: Throwable => throw new SparkException("Could not initialize CUDA because of unknown reason", ex) } + def gpuCount = { + val count = new Array[Int](1) + cuDeviceGetCount(count) + count(0) + } + // private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { // TODO : change it back to private after development def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { @@ -71,14 +107,22 @@ private[gpuenabler] class CUDAManager { val devIx = new Array[Int](1) JCuda.cudaGetDevice(devIx) +<<<<<<< HEAD +======= + +>>>>>>> 1826bb2... handle nodes with no GPU attached synchronized { // Since multiple modules cannot be loaded into one context in runtime API, // we use singleton cache http://stackoverflow.com/questions/32502375/ // loading-multiple-modules-in-jcuda-is-not-working // TODO support loading multiple ptxs // http://stackoverflow.com/questions/32535828/jit-in-jcuda-loading-multiple-ptx-modules + CUDAManagerCachedModule.getInstance.getOrElseUpdate((key, devIx(0)), { +<<<<<<< HEAD // println(" MODULE LOAD ") +======= +>>>>>>> 1826bb2... handle nodes with no GPU attached // TODO maybe unload the module if it won't be needed later var moduleBinaryData: Array[Byte] = null if (resourceURL != null) { @@ -94,6 +138,7 @@ private[gpuenabler] class CUDAManager { moduleBinaryData0(moduleBinaryData.length) = 0 val module = new CUmodule JCudaDriver.cuModuleLoadData(module, moduleBinaryData0) + CUDAManagerCachedModule.getInstance.put((key, devIx(0)), module) module }) } diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala index 01e8215..ec6e496 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDARDDUtils.scala @@ -85,31 +85,35 @@ private[gpuenabler] class MapGPUPartitionsRDD[U: ClassTag, T: ClassTag]( private val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[U] override def compute(split: Partition, context: TaskContext): Iterator[U] = { - // Use the block ID of this particular (rdd, partition) - val blockId = RDDBlockId(this.id, split.index) + if (GPUSparkEnv.get.isGPUEnabled) { + // Use the block ID of this particular (rdd, partition) + val blockId = RDDBlockId(this.id, split.index) - // Handle empty partitions. - if (firstParent[T].iterator(split, context).length <= 0) - return new Array[U](0).toIterator + // Handle empty partitions. + if (firstParent[T].iterator(split, context).length <= 0) + return new Array[U](0).toIterator - val inputHyIter = firstParent[T].iterator(split, context) match { - case hyIter: HybridIterator[T] => { - hyIter - } - case iter: Iterator[T] => { - // println("Converting Regular Iterator to hybridIterator") - val parentBlockId = RDDBlockId(firstParent[T].id, split.index) - val hyIter = new HybridIterator[T](iter.toArray, inputColSchema, - kernel.inputColumnsOrder, Some(parentBlockId)) - hyIter + val inputHyIter = firstParent[T].iterator(split, context) match { + case hyIter: HybridIterator[T] => { + hyIter + } + case iter: Iterator[T] => { + // println("Converting Regular Iterator to hybridIterator") + val parentBlockId = RDDBlockId(firstParent[T].id, split.index) + val hyIter = new HybridIterator[T](iter.toArray, inputColSchema, + kernel.inputColumnsOrder, Some(parentBlockId)) + hyIter + } } - } - val resultIter = kernel.compute[U, T](inputHyIter, - Seq(inputColSchema, outputColSchema), None, - outputArraySizes, inputFreeVariables, Some(blockId)) + val resultIter = kernel.compute[U, T](inputHyIter, + Seq(inputColSchema, outputColSchema), None, + outputArraySizes, inputFreeVariables, Some(blockId)) - resultIter + resultIter + } else { + f(context, split.index, firstParent[T].iterator(split, context)) + } } } @@ -315,25 +319,39 @@ object CUDARDDImplicits { val cleanF = CUDAUtils.cleanFn(sc, f) // sc.clean(f) - val inputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] - val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] - - val reducePartition: (TaskContext, Iterator[T]) => Option[T] = - (ctx: TaskContext, data: Iterator[T]) => { - data match { - case col: HybridIterator[T] => - if (col.numElements != 0) { - val colIter = extfunc.compute[T, T](col, Seq(inputColSchema, outputColSchema), - Some(1), outputArraySizes, - inputFreeVariables, None).asInstanceOf[HybridIterator[T]] - Some(colIter.next) - } else { - None - } - // Handle partitions with no data - case _ => None - } + val reducePartition: (TaskContext, Iterator[T]) => Option[T] = (ctx: TaskContext, data: Iterator[T]) => { + if (GPUSparkEnv.get.isGPUEnabled) { + val inputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] + val outputColSchema: ColumnPartitionSchema = ColumnPartitionSchema.schemaFor[T] + + data match { + case col: HybridIterator[T] => + if (col.numElements != 0) { + val colIter = extfunc.compute[T, T](col, Seq(inputColSchema, outputColSchema), + Some(1), outputArraySizes, + inputFreeVariables, None).asInstanceOf[HybridIterator[T]] + Some(colIter.next) + } else { + None + } + // Handle partitions with no data + case _ => + if (data.hasNext) { + Some(data.reduceLeft(cleanF)) + } + else { + None + } + } + } else { + if (data.hasNext) { + Some(data.reduceLeft(cleanF)) + } + else { + None + } } + } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { @@ -344,6 +362,7 @@ object CUDARDDImplicits { } } } + sc.runJob(rdd, reducePartition, 0 until rdd.partitions.length, mergeResult) jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala index 8e27ff5..3b89737 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala @@ -19,7 +19,7 @@ package com.ibm.gpuenabler import org.apache.spark.SparkEnv import org.apache.spark.gpuenabler.CUDAUtils._ - +import jcuda.runtime.JCuda private[gpuenabler] class GPUSparkEnv() { @@ -45,7 +45,8 @@ private[gpuenabler] class GPUSparkEnv() { new GPUMemoryManagerMasterEndPoint(rpcEnv)), isDriver, isLocal) - val isGPUEnabled = (cudaManager != null) + val isGPUEnabled = if (cudaManager != null) cudaManager.isGPUEnabled else false + def gpuCount = if (isGPUEnabled) cudaManager.gpuCount else 0 val isGPUCodeGenEnabled = isGPUEnabled && SparkEnv.get.conf.getBoolean("spark.gpu.codegen", false) } @@ -65,6 +66,14 @@ private[gpuenabler] object GPUSparkEnv extends _Logging { oldSparkEnv = SparkEnv.get initalize() } + + if (env.isGPUEnabled) { + val executorId = env.executorId match { + case "driver" => 0 + case _ => SparkEnv.get.executorId.toInt + } + JCuda.cudaSetDevice(executorId % env.gpuCount ) + } env } } diff --git a/gpu-enabler/src/test/resources/Makefile b/gpu-enabler/src/test/resources/Makefile new file mode 100644 index 0000000..4bdfad5 --- /dev/null +++ b/gpu-enabler/src/test/resources/Makefile @@ -0,0 +1,15 @@ +CUDA_PATH ?= /usr +CXX ?= g++ +NVCC ?= $(CUDA_PATH)/bin/nvcc +COMPUTE_CAPABILITY ?= 35 +CXXFLAGS ?= -m64 -O3 -Xcompiler -Wall --std=c++11 -g +NVCCFLAGS ?= -arch=sm_$(COMPUTE_CAPABILITY) -Xptxas="-v" + + +all : testCUDAKernels.ptx + +testCUDAKernels.ptx: testCUDAKernels.cu + $(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^ + +clean: + rm -f testCUDAKernels.ptx From 27d951036c5799522fbb4b08bf49ceb9da925aa1 Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Mon, 30 Jan 2017 07:28:16 -0600 Subject: [PATCH 2/2] remove comments --- .../com/ibm/gpuenabler/CUDAManager.scala | 53 ++++--------------- 1 file changed, 9 insertions(+), 44 deletions(-) diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala index 844c2ab..c6537c6 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala @@ -19,7 +19,6 @@ package com.ibm.gpuenabler import java.util.Date import java.net.URL -import java.util.concurrent.ConcurrentHashMap import jcuda.Pointer import jcuda.driver.JCudaDriver._ import jcuda.driver.{CUdeviceptr, CUmodule, JCudaDriver} @@ -27,20 +26,11 @@ import jcuda.runtime.JCuda import org.apache.commons.io.IOUtils import org.apache.spark.SparkException import org.slf4j.{Logger, LoggerFactory} -import java.lang.management.ManagementFactory import scala.collection.mutable.HashMap import scala.collection.mutable import java.text.SimpleDateFormat -/* -private[gpuenabler] object CUDAManagerCachedModule { - private val cachedModules = new ConcurrentHashMap[(String, Int), CUmodule] - def getInstance() : ConcurrentHashMap[(String, Int), CUmodule] = { cachedModules } -} -*/ - private[gpuenabler] object CUDAManagerCachedModule { -// val lock:Object = new Object private val cachedModules = new HashMap[(String, Int), CUmodule] def getInstance() : HashMap[(String, Int), CUmodule] = { cachedModules } } @@ -49,41 +39,24 @@ private[gpuenabler] class CUDAManager { // Initialization // This is supposed to be called before ANY other JCuda* call to ensure we have properly loaded // native jCuda library and cuda context -<<<<<<< HEAD - try { - JCudaDriver.setExceptionsEnabled(true) - JCudaDriver.cuInit(0) -======= - val executorId = SparkEnv.get.executorId match { - case "driver" => 0 - case _ => SparkEnv.get.executorId.toInt - } - val dateFormatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS") - - var isGPUEnabled = false - + private[gpuenabler] var isGPUEnabled = false try { JCudaDriver.setExceptionsEnabled(true) JCudaDriver.cuInit(0) isGPUEnabled = true ->>>>>>> 1826bb2... handle nodes with no GPU attached } catch { - case ex: UnsatisfiedLinkError => println("Native CUDA libraries not detected.") - case ex: NoClassDefFoundError => println("Native CUDA libraries not detected.") -/* - case ex: UnsatisfiedLinkError => - throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " + - "not detected - make sure Driver and Executors are able to load them", ex) + case ex: UnsatisfiedLinkError => + CUDAManager.logger.info("Could not initialize CUDA, because native jCuda libraries were " + + "not detected - continue to use CPU for execution") case ex: NoClassDefFoundError => - throw new SparkException("Could not initialize CUDA, because native jCuda libraries were " + - "not detected - make sure Driver and Executors are able to load them", ex) -*/ - + CUDAManager.logger.info("Could not initialize CUDA, because native jCuda libraries were " + + "not detected - continue to use CPU for execution") case ex: Throwable => throw new SparkException("Could not initialize CUDA because of unknown reason", ex) } - def gpuCount = { + // Returns the number of GPUs available in this node + private[gpuenabler] def gpuCount = { val count = new Array[Int](1) cuDeviceGetCount(count) count(0) @@ -91,7 +64,7 @@ private[gpuenabler] class CUDAManager { // private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { // TODO : change it back to private after development - def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { + private[gpuenabler] def cachedLoadModule(resource: Either[URL, (String, String)]): CUmodule = { var resourceURL: URL = null var key: String = null var ptxString: String = null @@ -107,22 +80,14 @@ private[gpuenabler] class CUDAManager { val devIx = new Array[Int](1) JCuda.cudaGetDevice(devIx) -<<<<<<< HEAD -======= ->>>>>>> 1826bb2... handle nodes with no GPU attached synchronized { // Since multiple modules cannot be loaded into one context in runtime API, // we use singleton cache http://stackoverflow.com/questions/32502375/ // loading-multiple-modules-in-jcuda-is-not-working // TODO support loading multiple ptxs // http://stackoverflow.com/questions/32535828/jit-in-jcuda-loading-multiple-ptx-modules - CUDAManagerCachedModule.getInstance.getOrElseUpdate((key, devIx(0)), { -<<<<<<< HEAD - // println(" MODULE LOAD ") -======= ->>>>>>> 1826bb2... handle nodes with no GPU attached // TODO maybe unload the module if it won't be needed later var moduleBinaryData: Array[Byte] = null if (resourceURL != null) {