From c4c6b7e1454f1c665ce69d693960b33fed0769f3 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 20 May 2021 13:45:37 -0700 Subject: [PATCH 1/7] align GDS reads/writes to 4 KiB Signed-off-by: Rong Ou --- .../spark/rapids/GpuDeviceManager.scala | 13 +++++- .../nvidia/spark/rapids/RapidsGdsStore.scala | 45 +++++++++++++------ 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index c741506f977..1259e5936ee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -162,6 +162,8 @@ object GpuDeviceManager extends Logging { } } + private def toKB(x: Long): Double = x / 1024.0 + private def toMB(x: Long): Double = x / 1024 / 1024.0 private def computeRmmInitSizes(conf: RapidsConf, info: CudaMemInfo): (Long, Long) = { @@ -263,9 +265,18 @@ object GpuDeviceManager extends Logging { logInfo("Using legacy default stream") } + val (allocationAlignment, alignmentThreshold) = if (conf.isGdsSpillEnabled) { + logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " + + s"alignment threshold = ${toKB(RapidsGdsStore.AlignmentThreshold)} KB") + (RapidsGdsStore.AllocationAlignment, RapidsGdsStore.AlignmentThreshold) + } else { + (0L, 0L) + } + try { Cuda.setDevice(gpuId) - Rmm.initialize(init, logConf, initialAllocation, maxAllocation) + Rmm.initialize( + init, logConf, initialAllocation, maxAllocation, allocationAlignment, alignmentThreshold) RapidsBufferCatalog.init(conf) GpuShuffleEnv.init(conf) } catch { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index bf17c11c33a..73958d1cf28 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -70,7 +70,8 @@ class RapidsGdsStore( override def materializeMemoryBuffer: MemoryBuffer = { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => - CuFile.readFileToDeviceBuffer(buffer, path, fileOffset) + CuFile.readFileToDeviceMemory( + buffer.getAddress, RapidsGdsStore.alignBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") buffer } @@ -83,7 +84,8 @@ class RapidsGdsStore( val dm = dmOriginal.slice(dstOffset, length) // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() - CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) + CuFile.readFileToDeviceMemory( + dm.getAddress, RapidsGdsStore.alignBufferSize(dm), path, fileOffset + srcOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") case _ => throw new IllegalStateException( s"GDS can only copy to device buffer, not ${dst.getClass}") @@ -102,17 +104,18 @@ class RapidsGdsStore( } private def singleShotSpill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer) - : RapidsBufferBase = { + : RapidsBufferBase = { val id = other.id val path = id.getDiskPath(diskBlockManager) + val alignedSize = RapidsGdsStore.alignBufferSize(deviceBuffer) // When sharing files, append to the file; otherwise, write from the beginning. val fileOffset = if (id.canShareDiskPaths) { // only one writer at a time for now when using shared files path.synchronized { - CuFile.appendDeviceBufferToFile(path, deviceBuffer) + CuFile.appendDeviceMemoryToFile(path, deviceBuffer.getAddress, alignedSize) } } else { - CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer) + CuFile.writeDeviceMemoryToFile(path, 0, deviceBuffer.getAddress, alignedSize) 0 } logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS") @@ -121,7 +124,6 @@ class RapidsGdsStore( } class BatchSpiller() { - private val blockSize = 4096 private[this] val spilledBuffers = new ConcurrentHashMap[File, Set[RapidsBufferId]] private[this] val pendingBuffers = ArrayBuffer.empty[RapidsGdsBatchedBuffer] private[this] val batchWriteBuffer = CuFileBuffer.allocate(batchWriteBufferSize, true) @@ -149,15 +151,11 @@ class RapidsGdsStore( addBuffer(currentFile, id) val gdsBuffer = new RapidsGdsBatchedBuffer(id, currentFile, currentOffset, other.size, other.meta, other.getSpillPriority, other.spillCallback) - currentOffset += alignUp(deviceBuffer.getLength) + currentOffset += RapidsGdsStore.alignUp(deviceBuffer.getLength) pendingBuffers += gdsBuffer gdsBuffer } - private def alignUp(length: Long): Long = { - (length + blockSize - 1) & ~(blockSize - 1) - } - private def copyToBuffer( buffer: MemoryBuffer, offset: Long, size: Long, stream: Cuda.Stream): Unit = { buffer.copyFromMemoryBuffer(0, batchWriteBuffer, offset, size, stream) @@ -208,7 +206,8 @@ class RapidsGdsStore( Cuda.DEFAULT_STREAM.sync() logDebug(s"Created device buffer $size from batch write buffer") } else { - CuFile.readFileToDeviceBuffer(buffer, path, fileOffset) + CuFile.readFileToDeviceMemory( + buffer.getAddress, RapidsGdsStore.alignBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") } buffer @@ -227,7 +226,8 @@ class RapidsGdsStore( } else { // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() - CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) + CuFile.readFileToDeviceMemory( + dm.getAddress, RapidsGdsStore.alignBufferSize(dm), path, fileOffset + srcOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") } case _ => throw new IllegalStateException( @@ -252,4 +252,21 @@ class RapidsGdsStore( } } } -} \ No newline at end of file +} + +object RapidsGdsStore { + val AllocationAlignment = 4096L + val AlignmentThreshold = 65536L + + def alignUp(length: Long): Long = { + (length + AllocationAlignment - 1) & ~(AllocationAlignment - 1) + } + + def alignBufferSize(buffer: DeviceMemoryBuffer): Long = { + if (buffer.getLength < AlignmentThreshold) { + buffer.getLength + } else { + alignUp(buffer.getLength) + } + } +} From 9c854c7e05a71468fe6f6fe9dce0fb10994af7d3 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 20 May 2021 15:12:14 -0700 Subject: [PATCH 2/7] extract variable Signed-off-by: Rong Ou --- .../scala/com/nvidia/spark/rapids/RapidsGdsStore.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 73958d1cf28..6d36994e493 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -263,10 +263,11 @@ object RapidsGdsStore { } def alignBufferSize(buffer: DeviceMemoryBuffer): Long = { - if (buffer.getLength < AlignmentThreshold) { - buffer.getLength + val length = buffer.getLength + if (length < AlignmentThreshold) { + length } else { - alignUp(buffer.getLength) + alignUp(length) } } } From 0d577cc00aceefb86fb0312ed4e077cc1e93992e Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 20 May 2021 16:40:50 -0700 Subject: [PATCH 3/7] add comments for alignment Signed-off-by: Rong Ou --- .../nvidia/spark/rapids/RapidsGdsStore.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 6d36994e493..1d3608acf37 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -28,7 +28,25 @@ import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId} -/** A buffer store using GPUDirect Storage (GDS). */ +/** + * A buffer store using GPUDirect Storage (GDS). + * + * GDS is more efficient when IO is aligned. + * + * An IO is unaligned if one of the following conditions is true: + * - The file_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * - The size that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * - The devPtr_base that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * - The devPtr_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned. + * + * To avoid unaligned IO, when GDS spilling is enabled, the RMM `aligned_resource_adapter` is used + * so that large buffers above certain size threshold are allocated with 4K aligned base pointer + * and size. + * + * When reading and writing these large buffers through GDS, the size is aligned up to the next 4K + * boundary. Although the aligned size appears to be out of bound, the extra space needed is held + * in reserve by the RMM `aligned_resource_adapter`. + */ class RapidsGdsStore( diskBlockManager: RapidsDiskBlockManager, batchWriteBufferSize: Long, From 5762934a40864063081f31123c8c375f539c9ba8 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Fri, 21 May 2021 16:50:09 -0700 Subject: [PATCH 4/7] add unaligned fallback Signed-off-by: Rong Ou --- .../nvidia/spark/rapids/RapidsGdsStore.scala | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 1d3608acf37..ac2d4fc9a03 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -100,11 +100,17 @@ class RapidsGdsStore( dst match { case dmOriginal: DeviceMemoryBuffer => val dm = dmOriginal.slice(dstOffset, length) + val alignedSize = if (RapidsGdsStore.isAligned(srcOffset)) { + RapidsGdsStore.alignBufferSize(dm) + } else { + logDebug( + s"Source offset $srcOffset is not 4 KiB aligned, falling back to unaligned read") + dm.getLength + } // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() - CuFile.readFileToDeviceMemory( - dm.getAddress, RapidsGdsStore.alignBufferSize(dm), path, fileOffset + srcOffset) - logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") + CuFile.readFileToDeviceMemory(dm.getAddress, alignedSize, path, fileOffset + srcOffset) + logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") case _ => throw new IllegalStateException( s"GDS can only copy to device buffer, not ${dst.getClass}") } @@ -242,11 +248,18 @@ class RapidsGdsStore( stream.sync() logDebug(s"Created device buffer $size from batch write buffer") } else { + val alignedSize = if (RapidsGdsStore.isAligned(srcOffset)) { + RapidsGdsStore.alignBufferSize(dm) + } else { + logDebug( + s"Source offset $srcOffset is not 4 KiB aligned, falling back to unaligned read") + dm.getLength + } // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() CuFile.readFileToDeviceMemory( - dm.getAddress, RapidsGdsStore.alignBufferSize(dm), path, fileOffset + srcOffset) - logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") + dm.getAddress, alignedSize, path, fileOffset + srcOffset) + logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") } case _ => throw new IllegalStateException( s"GDS can only copy to device buffer, not ${dst.getClass}") @@ -276,6 +289,10 @@ object RapidsGdsStore { val AllocationAlignment = 4096L val AlignmentThreshold = 65536L + def isAligned(length: Long): Boolean = { + length % AllocationAlignment == 0 + } + def alignUp(length: Long): Long = { (length + AllocationAlignment - 1) & ~(AllocationAlignment - 1) } From 833bd9c526e67c4e3ff23505276942b3fa112ece Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Mon, 24 May 2021 11:38:26 -0700 Subject: [PATCH 5/7] turn off aligned with copyToMemoryBuffer() Signed-off-by: Rong Ou --- .../nvidia/spark/rapids/RapidsGdsStore.scala | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index ac2d4fc9a03..5597610dcd7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -100,16 +100,9 @@ class RapidsGdsStore( dst match { case dmOriginal: DeviceMemoryBuffer => val dm = dmOriginal.slice(dstOffset, length) - val alignedSize = if (RapidsGdsStore.isAligned(srcOffset)) { - RapidsGdsStore.alignBufferSize(dm) - } else { - logDebug( - s"Source offset $srcOffset is not 4 KiB aligned, falling back to unaligned read") - dm.getLength - } // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() - CuFile.readFileToDeviceMemory(dm.getAddress, alignedSize, path, fileOffset + srcOffset) + CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") case _ => throw new IllegalStateException( s"GDS can only copy to device buffer, not ${dst.getClass}") @@ -248,17 +241,9 @@ class RapidsGdsStore( stream.sync() logDebug(s"Created device buffer $size from batch write buffer") } else { - val alignedSize = if (RapidsGdsStore.isAligned(srcOffset)) { - RapidsGdsStore.alignBufferSize(dm) - } else { - logDebug( - s"Source offset $srcOffset is not 4 KiB aligned, falling back to unaligned read") - dm.getLength - } // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() - CuFile.readFileToDeviceMemory( - dm.getAddress, alignedSize, path, fileOffset + srcOffset) + CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") } case _ => throw new IllegalStateException( From b3f3abaf269c8cd96a564b46e3457ffdf7f86634 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Tue, 25 May 2021 15:38:11 -0700 Subject: [PATCH 6/7] add option to turn off aligned IO Signed-off-by: Rong Ou --- .../spark/rapids/GpuDeviceManager.scala | 15 ++++++----- .../spark/rapids/RapidsBufferCatalog.scala | 3 ++- .../com/nvidia/spark/rapids/RapidsConf.scala | 11 ++++++++ .../nvidia/spark/rapids/RapidsGdsStore.scala | 27 ++++++++++++++----- .../spark/rapids/RapidsGdsStoreSuite.scala | 5 ++-- 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index 1259e5936ee..99feddab61e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -265,13 +265,14 @@ object GpuDeviceManager extends Logging { logInfo("Using legacy default stream") } - val (allocationAlignment, alignmentThreshold) = if (conf.isGdsSpillEnabled) { - logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " + - s"alignment threshold = ${toKB(RapidsGdsStore.AlignmentThreshold)} KB") - (RapidsGdsStore.AllocationAlignment, RapidsGdsStore.AlignmentThreshold) - } else { - (0L, 0L) - } + val (allocationAlignment, alignmentThreshold) = + if (conf.isGdsSpillEnabled && conf.isGdsSpillAlignedIO) { + logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " + + s"alignment threshold = ${toKB(RapidsGdsStore.AlignmentThreshold)} KB") + (RapidsGdsStore.AllocationAlignment, RapidsGdsStore.AlignmentThreshold) + } else { + (0L, 0L) + } try { Cuda.setDevice(gpuId) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 40191b34b86..464a251e348 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -181,7 +181,8 @@ object RapidsBufferCatalog extends Logging with Arm { deviceStorage = new RapidsDeviceMemoryStore() val diskBlockManager = new RapidsDiskBlockManager(conf) if (rapidsConf.isGdsSpillEnabled) { - gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize) + gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize, + rapidsConf.isGdsSpillAlignedIO) deviceStorage.setSpillStore(gdsStorage) } else { hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a33d58bbeb9..3b14d1d670b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -378,6 +378,15 @@ object RapidsConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(ByteUnit.MiB.toBytes(8)) + val GDS_SPILL_ALIGNED_IO = + conf("spark.rapids.memory.gpu.direct.storage.spill.alignedIO") + .doc("When GDS spill is enabled, should I/O be 4 KiB aligned. GDS is more efficient when " + + "reads and writes are 4 KiB aligned, but aligning has some additional memory overhead " + + "with the padding.") + .internal() + .booleanConf + .createWithDefault(true) + val POOLED_MEM = conf("spark.rapids.memory.gpu.pooling.enabled") .doc("Should RMM act as a pooling allocator for GPU memory, or should it just pass " + "through to CUDA memory allocation directly. DEPRECATED: please use " + @@ -1373,6 +1382,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val gdsSpillBatchWriteBufferSize: Long = get(GDS_SPILL_BATCH_WRITE_BUFFER_SIZE) + lazy val isGdsSpillAlignedIO: Boolean = get(GDS_SPILL_ALIGNED_IO) + lazy val hasNans: Boolean = get(HAS_NANS) lazy val gpuTargetBatchSizeBytes: Long = get(GPU_BATCH_SIZE_BYTES) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 5597610dcd7..963d02f6543 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -50,6 +50,7 @@ import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId} class RapidsGdsStore( diskBlockManager: RapidsDiskBlockManager, batchWriteBufferSize: Long, + alignedIO: Boolean, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends RapidsBufferStore(StorageTier.GDS, catalog) with Arm { private[this] val batchSpiller = new BatchSpiller() @@ -69,6 +70,14 @@ class RapidsGdsStore( } } + private def alignBufferSize(buffer: DeviceMemoryBuffer): Long = { + if (alignedIO) { + RapidsGdsStore.alignBufferSize(buffer) + } else { + buffer.getLength + } + } + abstract class RapidsGdsBuffer( override val id: RapidsBufferId, override val size: Long, @@ -88,8 +97,7 @@ class RapidsGdsStore( override def materializeMemoryBuffer: MemoryBuffer = { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => - CuFile.readFileToDeviceMemory( - buffer.getAddress, RapidsGdsStore.alignBufferSize(buffer), path, fileOffset) + CuFile.readFileToDeviceMemory(buffer.getAddress, alignBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") buffer } @@ -102,6 +110,8 @@ class RapidsGdsStore( val dm = dmOriginal.slice(dstOffset, length) // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() + // TODO: align the reads once https://github.com/NVIDIA/spark-rapids/issues/2492 is + // resolved. CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") case _ => throw new IllegalStateException( @@ -124,7 +134,7 @@ class RapidsGdsStore( : RapidsBufferBase = { val id = other.id val path = id.getDiskPath(diskBlockManager) - val alignedSize = RapidsGdsStore.alignBufferSize(deviceBuffer) + val alignedSize = alignBufferSize(deviceBuffer) // When sharing files, append to the file; otherwise, write from the beginning. val fileOffset = if (id.canShareDiskPaths) { // only one writer at a time for now when using shared files @@ -149,10 +159,11 @@ class RapidsGdsStore( def spill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer): RapidsBufferBase = this.synchronized { - if (deviceBuffer.getLength > batchWriteBufferSize - currentOffset) { + val alignedSize = alignBufferSize(deviceBuffer) + if (alignedSize > batchWriteBufferSize - currentOffset) { val path = currentFile.getAbsolutePath withResource(new CuFileWriteHandle(path)) { handle => - handle.write(batchWriteBuffer, currentOffset, 0) + handle.write(batchWriteBuffer, batchWriteBufferSize, 0) logDebug(s"Spilled to $path 0:$currentOffset via GDS") } pendingBuffers.foreach(_.unsetPending()) @@ -168,7 +179,7 @@ class RapidsGdsStore( addBuffer(currentFile, id) val gdsBuffer = new RapidsGdsBatchedBuffer(id, currentFile, currentOffset, other.size, other.meta, other.getSpillPriority, other.spillCallback) - currentOffset += RapidsGdsStore.alignUp(deviceBuffer.getLength) + currentOffset += alignedSize pendingBuffers += gdsBuffer gdsBuffer } @@ -224,7 +235,7 @@ class RapidsGdsStore( logDebug(s"Created device buffer $size from batch write buffer") } else { CuFile.readFileToDeviceMemory( - buffer.getAddress, RapidsGdsStore.alignBufferSize(buffer), path, fileOffset) + buffer.getAddress, alignBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") } buffer @@ -243,6 +254,8 @@ class RapidsGdsStore( } else { // TODO: switch to async API when it's released, using the passed in CUDA stream. stream.sync() + // TODO: align the reads once https://github.com/NVIDIA/spark-rapids/issues/2492 is + // resolved. CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset) logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS") } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala index 2690d3084fb..3d2ee3f2b2b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala @@ -66,7 +66,7 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with val batchWriteBufferSize = 16384 // Holds 2 buffers. withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => withResource(new RapidsGdsStore( - diskBlockManager, batchWriteBufferSize, catalog)) { gdsStore => + diskBlockManager, batchWriteBufferSize, false, catalog)) { gdsStore => devStore.setSpillStore(gdsStore) assertResult(0)(gdsStore.currentSize) @@ -111,7 +111,8 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with val spillPriority = -7 val catalog = spy(new RapidsBufferCatalog) withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, catalog)) { gdsStore => + withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, false, catalog)) { + gdsStore => devStore.setSpillStore(gdsStore) assertResult(0)(gdsStore.currentSize) val bufferSize = addTableToStore(devStore, bufferId, spillPriority) From 7dc420dc60851bcb8022ca5bc8c2756e4c386fc1 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 27 May 2021 14:07:16 -0700 Subject: [PATCH 7/7] add config for alignment threshold Signed-off-by: Rong Ou --- .../spark/rapids/GpuDeviceManager.scala | 4 +-- .../spark/rapids/RapidsBufferCatalog.scala | 2 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 11 +++++++ .../nvidia/spark/rapids/RapidsGdsStore.scala | 33 +++++++++---------- .../spark/rapids/RapidsGdsStoreSuite.scala | 4 +-- 5 files changed, 31 insertions(+), 23 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index 99feddab61e..f84faa600e5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -268,8 +268,8 @@ object GpuDeviceManager extends Logging { val (allocationAlignment, alignmentThreshold) = if (conf.isGdsSpillEnabled && conf.isGdsSpillAlignedIO) { logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " + - s"alignment threshold = ${toKB(RapidsGdsStore.AlignmentThreshold)} KB") - (RapidsGdsStore.AllocationAlignment, RapidsGdsStore.AlignmentThreshold) + s"alignment threshold = ${toKB(conf.gdsSpillAlignmentThreshold)} KB") + (RapidsGdsStore.AllocationAlignment, conf.gdsSpillAlignmentThreshold) } else { (0L, 0L) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 464a251e348..72968057b94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -182,7 +182,7 @@ object RapidsBufferCatalog extends Logging with Arm { val diskBlockManager = new RapidsDiskBlockManager(conf) if (rapidsConf.isGdsSpillEnabled) { gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize, - rapidsConf.isGdsSpillAlignedIO) + rapidsConf.isGdsSpillAlignedIO, rapidsConf.gdsSpillAlignmentThreshold) deviceStorage.setSpillStore(gdsStorage) } else { hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 3b14d1d670b..b7d71a82265 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -387,6 +387,15 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val GDS_SPILL_ALIGNMENT_THRESHOLD = + conf("spark.rapids.memory.gpu.direct.storage.spill.alignmentThreshold") + .doc("GPU memory buffers with size above this threshold will be aligned to 4 KiB. Setting " + + "this value to 0 means every allocation will be 4 KiB aligned. A low threshold may " + + "cause more memory consumption because of padding.") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(ByteUnit.KiB.toBytes(64)) + val POOLED_MEM = conf("spark.rapids.memory.gpu.pooling.enabled") .doc("Should RMM act as a pooling allocator for GPU memory, or should it just pass " + "through to CUDA memory allocation directly. DEPRECATED: please use " + @@ -1384,6 +1393,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isGdsSpillAlignedIO: Boolean = get(GDS_SPILL_ALIGNED_IO) + lazy val gdsSpillAlignmentThreshold: Long = get(GDS_SPILL_ALIGNMENT_THRESHOLD) + lazy val hasNans: Boolean = get(HAS_NANS) lazy val gpuTargetBatchSizeBytes: Long = get(GPU_BATCH_SIZE_BYTES) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 963d02f6543..16ce2b42ec2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -51,6 +51,7 @@ class RapidsGdsStore( diskBlockManager: RapidsDiskBlockManager, batchWriteBufferSize: Long, alignedIO: Boolean, + alignmentThreshold: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends RapidsBufferStore(StorageTier.GDS, catalog) with Arm { private[this] val batchSpiller = new BatchSpiller() @@ -72,12 +73,21 @@ class RapidsGdsStore( private def alignBufferSize(buffer: DeviceMemoryBuffer): Long = { if (alignedIO) { - RapidsGdsStore.alignBufferSize(buffer) + RapidsGdsStore.alignUp(buffer.getLength) } else { buffer.getLength } } + private def alignLargeBufferSize(buffer: DeviceMemoryBuffer): Long = { + val length = buffer.getLength + if (alignedIO && length >= alignmentThreshold) { + RapidsGdsStore.alignUp(length) + } else { + length + } + } + abstract class RapidsGdsBuffer( override val id: RapidsBufferId, override val size: Long, @@ -97,7 +107,8 @@ class RapidsGdsStore( override def materializeMemoryBuffer: MemoryBuffer = { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => - CuFile.readFileToDeviceMemory(buffer.getAddress, alignBufferSize(buffer), path, fileOffset) + CuFile.readFileToDeviceMemory( + buffer.getAddress, alignLargeBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") buffer } @@ -134,7 +145,7 @@ class RapidsGdsStore( : RapidsBufferBase = { val id = other.id val path = id.getDiskPath(diskBlockManager) - val alignedSize = alignBufferSize(deviceBuffer) + val alignedSize = alignLargeBufferSize(deviceBuffer) // When sharing files, append to the file; otherwise, write from the beginning. val fileOffset = if (id.canShareDiskPaths) { // only one writer at a time for now when using shared files @@ -235,7 +246,7 @@ class RapidsGdsStore( logDebug(s"Created device buffer $size from batch write buffer") } else { CuFile.readFileToDeviceMemory( - buffer.getAddress, alignBufferSize(buffer), path, fileOffset) + buffer.getAddress, alignLargeBufferSize(buffer), path, fileOffset) logDebug(s"Created device buffer for $path $fileOffset:$size via GDS") } buffer @@ -285,22 +296,8 @@ class RapidsGdsStore( object RapidsGdsStore { val AllocationAlignment = 4096L - val AlignmentThreshold = 65536L - - def isAligned(length: Long): Boolean = { - length % AllocationAlignment == 0 - } def alignUp(length: Long): Long = { (length + AllocationAlignment - 1) & ~(AllocationAlignment - 1) } - - def alignBufferSize(buffer: DeviceMemoryBuffer): Long = { - val length = buffer.getLength - if (length < AlignmentThreshold) { - length - } else { - alignUp(length) - } - } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala index 3d2ee3f2b2b..eeaa02e2d0a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala @@ -66,7 +66,7 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with val batchWriteBufferSize = 16384 // Holds 2 buffers. withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => withResource(new RapidsGdsStore( - diskBlockManager, batchWriteBufferSize, false, catalog)) { gdsStore => + diskBlockManager, batchWriteBufferSize, false, 65536, catalog)) { gdsStore => devStore.setSpillStore(gdsStore) assertResult(0)(gdsStore.currentSize) @@ -111,7 +111,7 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with val spillPriority = -7 val catalog = spy(new RapidsBufferCatalog) withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, false, catalog)) { + withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, false, 65536, catalog)) { gdsStore => devStore.setSpillStore(gdsStore) assertResult(0)(gdsStore.currentSize)