diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index c741506f977..f84faa600e5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -162,6 +162,8 @@ object GpuDeviceManager extends Logging { } } + private def toKB(x: Long): Double = x / 1024.0 + private def toMB(x: Long): Double = x / 1024 / 1024.0 private def computeRmmInitSizes(conf: RapidsConf, info: CudaMemInfo): (Long, Long) = { @@ -263,9 +265,19 @@ object GpuDeviceManager extends Logging { logInfo("Using legacy default stream") } + val (allocationAlignment, alignmentThreshold) = + if (conf.isGdsSpillEnabled && conf.isGdsSpillAlignedIO) { + logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " + + s"alignment threshold = ${toKB(conf.gdsSpillAlignmentThreshold)} KB") + (RapidsGdsStore.AllocationAlignment, conf.gdsSpillAlignmentThreshold) + } else { + (0L, 0L) + } + try { Cuda.setDevice(gpuId) - Rmm.initialize(init, logConf, initialAllocation, maxAllocation) + Rmm.initialize( + init, logConf, initialAllocation, maxAllocation, allocationAlignment, alignmentThreshold) RapidsBufferCatalog.init(conf) GpuShuffleEnv.init(conf) } catch { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 40191b34b86..72968057b94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -181,7 +181,8 @@ object RapidsBufferCatalog extends Logging with Arm { deviceStorage = new RapidsDeviceMemoryStore() val diskBlockManager = new RapidsDiskBlockManager(conf) if (rapidsConf.isGdsSpillEnabled) { - gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize) + gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize, + rapidsConf.isGdsSpillAlignedIO, rapidsConf.gdsSpillAlignmentThreshold) deviceStorage.setSpillStore(gdsStorage) } else { hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a33d58bbeb9..b7d71a82265 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -378,6 +378,24 @@ object RapidsConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(ByteUnit.MiB.toBytes(8)) + val GDS_SPILL_ALIGNED_IO = + conf("spark.rapids.memory.gpu.direct.storage.spill.alignedIO") + .doc("When GDS spill is enabled, should I/O be 4 KiB aligned. GDS is more efficient when " + + "reads and writes are 4 KiB aligned, but aligning has some additional memory overhead " + + "with the padding.") + .internal() + .booleanConf + .createWithDefault(true) + + val GDS_SPILL_ALIGNMENT_THRESHOLD = + conf("spark.rapids.memory.gpu.direct.storage.spill.alignmentThreshold") + .doc("GPU memory buffers with size above this threshold will be aligned to 4 KiB. Setting " + + "this value to 0 means every allocation will be 4 KiB aligned. A low threshold may " + + "cause more memory consumption because of padding.") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(ByteUnit.KiB.toBytes(64)) + val POOLED_MEM = conf("spark.rapids.memory.gpu.pooling.enabled") .doc("Should RMM act as a pooling allocator for GPU memory, or should it just pass " + "through to CUDA memory allocation directly. DEPRECATED: please use " + @@ -1373,6 +1391,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val gdsSpillBatchWriteBufferSize: Long = get(GDS_SPILL_BATCH_WRITE_BUFFER_SIZE) + lazy val isGdsSpillAlignedIO: Boolean = get(GDS_SPILL_ALIGNED_IO) + + lazy val gdsSpillAlignmentThreshold: Long = get(GDS_SPILL_ALIGNMENT_THRESHOLD) + lazy val hasNans: Boolean = get(HAS_NANS) lazy val gpuTargetBatchSizeBytes: Long = get(GPU_BATCH_SIZE_BYTES) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index bf17c11c33a..16ce2b42ec2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -28,10 +28,30 @@ import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId} -/** A buffer store using GPUDirect Storage (GDS). */ +/** + * A buffer store using GPUDirect Storage (GDS). + * + * GDS is more efficient when IO is aligned. + * + * An IO is unaligned if one of the following conditions is true: + * - The file_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * - The size that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * - The devPtr_base that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * - The devPtr_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * + * To avoid unaligned IO, when GDS spilling is enabled, the RMM `aligned_resource_adapter` is used + * so that large buffers above certain size threshold are allocated with 4K aligned base pointer + * and size. + * + * When reading and writing these large buffers through GDS, the size is aligned up to the next 4K + * boundary. Although the aligned size appears to be out of bound, the extra space needed is held + * in reserve by the RMM `aligned_resource_adapter`. + */ class RapidsGdsStore( diskBlockManager: RapidsDiskBlockManager, batchWriteBufferSize: Long, + alignedIO: Boolean, + alignmentThreshold: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends RapidsBufferStore(StorageTier.GDS, catalog) with Arm { private[this] val batchSpiller = new BatchSpiller() @@ -51,6 +71,23 @@ class RapidsGdsStore( } } + private def alignBufferSize(buffer: DeviceMemoryBuffer): Long = { + if (alignedIO) { + RapidsGdsStore.alignUp(buffer.getLength) + } else { + buffer.getLength + } + } + + private def alignLargeBufferSize(buffer: DeviceMemoryBuffer): Long = { + val length = buffer.getLength + if (alignedIO && length >= alignmentThreshold) { + RapidsGdsStore.alignUp(length) + } else { + length + } + } + abstract class RapidsGdsBuffer( override val id: RapidsBufferId, override val size: Long, @@ -70,7 +107,8 @@ class RapidsGdsStore( override def materializeMemoryBuffer: MemoryBuffer = { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => - CuFile.readFileToDeviceBuffer(buffer, path, fileOffset) + CuFile.readFileToDeviceMemory( + buffer.getAddress, alignLargeBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") buffer } @@ -83,8 +121,10 @@ class RapidsGdsStore( val dm = dmOriginal.slice(dstOffset, length) // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() + // TODO: align the reads once https://github.com/NVIDIA/spark-rapids/issues/2492 is + // resolved. CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) - logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") + logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") case _ => throw new IllegalStateException( s"GDS can only copy to device buffer, not ${dst.getClass}") } @@ -102,17 +142,18 @@ class RapidsGdsStore( } private def singleShotSpill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer) - : RapidsBufferBase = { + : RapidsBufferBase = { val id = other.id val path = id.getDiskPath(diskBlockManager) + val alignedSize = alignLargeBufferSize(deviceBuffer) // When sharing files, append to the file; otherwise, write from the beginning. val fileOffset = if (id.canShareDiskPaths) { // only one writer at a time for now when using shared files path.synchronized { - CuFile.appendDeviceBufferToFile(path, deviceBuffer) + CuFile.appendDeviceMemoryToFile(path, deviceBuffer.getAddress, alignedSize) } } else { - CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer) + CuFile.writeDeviceMemoryToFile(path, 0, deviceBuffer.getAddress, alignedSize) 0 } logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS") @@ -121,7 +162,6 @@ class RapidsGdsStore( } class BatchSpiller() { - private val blockSize = 4096 private[this] val spilledBuffers = new ConcurrentHashMap[File, Set[RapidsBufferId]] private[this] val pendingBuffers = ArrayBuffer.empty[RapidsGdsBatchedBuffer] private[this] val batchWriteBuffer = CuFileBuffer.allocate(batchWriteBufferSize, true) @@ -130,10 +170,11 @@ class RapidsGdsStore( def spill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer): RapidsBufferBase = this.synchronized { - if (deviceBuffer.getLength > batchWriteBufferSize - currentOffset) { + val alignedSize = alignBufferSize(deviceBuffer) + if (alignedSize > batchWriteBufferSize - currentOffset) { val path = currentFile.getAbsolutePath withResource(new CuFileWriteHandle(path)) { handle => - handle.write(batchWriteBuffer, currentOffset, 0) + handle.write(batchWriteBuffer, batchWriteBufferSize, 0) logDebug(s"Spilled to $path 0:$currentOffset via GDS") } pendingBuffers.foreach(_.unsetPending()) @@ -149,15 +190,11 @@ class RapidsGdsStore( addBuffer(currentFile, id) val gdsBuffer = new RapidsGdsBatchedBuffer(id, currentFile, currentOffset, other.size, other.meta, other.getSpillPriority, other.spillCallback) - currentOffset += alignUp(deviceBuffer.getLength) + currentOffset += alignedSize pendingBuffers += gdsBuffer gdsBuffer } - private def alignUp(length: Long): Long = { - (length + blockSize - 1) & ~(blockSize - 1) - } - private def copyToBuffer( buffer: MemoryBuffer, offset: Long, size: Long, stream: Cuda.Stream): Unit = { buffer.copyFromMemoryBuffer(0, batchWriteBuffer, offset, size, stream) @@ -208,7 +245,8 @@ class RapidsGdsStore( Cuda.DEFAULT_STREAM.sync() logDebug(s"Created device buffer $size from batch write buffer") } else { - CuFile.readFileToDeviceBuffer(buffer, path, fileOffset) + CuFile.readFileToDeviceMemory( + buffer.getAddress, alignLargeBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") } buffer @@ -227,8 +265,10 @@ class RapidsGdsStore( } else { // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() + // TODO: align the reads once https://github.com/NVIDIA/spark-rapids/issues/2492 is + // resolved. CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) - logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") + logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") } case _ => throw new IllegalStateException( s"GDS can only copy to device buffer, not ${dst.getClass}") @@ -252,4 +292,12 @@ class RapidsGdsStore( } } } -} \ No newline at end of file +} + +object RapidsGdsStore { + val AllocationAlignment = 4096L + + def alignUp(length: Long): Long = { + (length + AllocationAlignment - 1) & ~(AllocationAlignment - 1) + } +} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala index 2690d3084fb..eeaa02e2d0a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala @@ -66,7 +66,7 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with val batchWriteBufferSize = 16384 // Holds 2 buffers. withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => withResource(new RapidsGdsStore( - diskBlockManager, batchWriteBufferSize, catalog)) { gdsStore => + diskBlockManager, batchWriteBufferSize, false, 65536, catalog)) { gdsStore => devStore.setSpillStore(gdsStore) assertResult(0)(gdsStore.currentSize) @@ -111,7 +111,8 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with val spillPriority = -7 val catalog = spy(new RapidsBufferCatalog) withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, catalog)) { gdsStore => + withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, false, 65536, catalog)) { + gdsStore => devStore.setSpillStore(gdsStore) assertResult(0)(gdsStore.currentSize) val bufferSize = addTableToStore(devStore, bufferId, spillPriority)