Skip to content

Commit

Permalink
align GDS reads/writes to 4 KiB (#2460)
Browse files Browse the repository at this point in the history
* align GDS reads/writes to 4 KiB

Signed-off-by: Rong Ou <[email protected]>

* extract variable

Signed-off-by: Rong Ou <[email protected]>

* add comments for alignment

Signed-off-by: Rong Ou <[email protected]>

* add unaligned fallback

Signed-off-by: Rong Ou <[email protected]>

* turn off aligned with copyToMemoryBuffer()

Signed-off-by: Rong Ou <[email protected]>

* add option to turn off aligned IO

Signed-off-by: Rong Ou <[email protected]>

* add config for alignment threshold

Signed-off-by: Rong Ou <[email protected]>
  • Loading branch information
rongou authored Jun 3, 2021
1 parent ea189f2 commit f70dbcb
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ object GpuDeviceManager extends Logging {
}
}

private def toKB(x: Long): Double = x / 1024.0

private def toMB(x: Long): Double = x / 1024 / 1024.0

private def computeRmmInitSizes(conf: RapidsConf, info: CudaMemInfo): (Long, Long) = {
Expand Down Expand Up @@ -263,9 +265,19 @@ object GpuDeviceManager extends Logging {
logInfo("Using legacy default stream")
}

val (allocationAlignment, alignmentThreshold) =
if (conf.isGdsSpillEnabled && conf.isGdsSpillAlignedIO) {
logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " +
s"alignment threshold = ${toKB(conf.gdsSpillAlignmentThreshold)} KB")
(RapidsGdsStore.AllocationAlignment, conf.gdsSpillAlignmentThreshold)
} else {
(0L, 0L)
}

try {
Cuda.setDevice(gpuId)
Rmm.initialize(init, logConf, initialAllocation, maxAllocation)
Rmm.initialize(
init, logConf, initialAllocation, maxAllocation, allocationAlignment, alignmentThreshold)
RapidsBufferCatalog.init(conf)
GpuShuffleEnv.init(conf)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ object RapidsBufferCatalog extends Logging with Arm {
deviceStorage = new RapidsDeviceMemoryStore()
val diskBlockManager = new RapidsDiskBlockManager(conf)
if (rapidsConf.isGdsSpillEnabled) {
gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize)
gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize,
rapidsConf.isGdsSpillAlignedIO, rapidsConf.gdsSpillAlignmentThreshold)
deviceStorage.setSpillStore(gdsStorage)
} else {
hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize)
Expand Down
22 changes: 22 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,24 @@ object RapidsConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(ByteUnit.MiB.toBytes(8))

val GDS_SPILL_ALIGNED_IO =
conf("spark.rapids.memory.gpu.direct.storage.spill.alignedIO")
.doc("When GDS spill is enabled, should I/O be 4 KiB aligned. GDS is more efficient when " +
"reads and writes are 4 KiB aligned, but aligning has some additional memory overhead " +
"with the padding.")
.internal()
.booleanConf
.createWithDefault(true)

val GDS_SPILL_ALIGNMENT_THRESHOLD =
conf("spark.rapids.memory.gpu.direct.storage.spill.alignmentThreshold")
.doc("GPU memory buffers with size above this threshold will be aligned to 4 KiB. Setting " +
"this value to 0 means every allocation will be 4 KiB aligned. A low threshold may " +
"cause more memory consumption because of padding.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(ByteUnit.KiB.toBytes(64))

val POOLED_MEM = conf("spark.rapids.memory.gpu.pooling.enabled")
.doc("Should RMM act as a pooling allocator for GPU memory, or should it just pass " +
"through to CUDA memory allocation directly. DEPRECATED: please use " +
Expand Down Expand Up @@ -1373,6 +1391,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val gdsSpillBatchWriteBufferSize: Long = get(GDS_SPILL_BATCH_WRITE_BUFFER_SIZE)

lazy val isGdsSpillAlignedIO: Boolean = get(GDS_SPILL_ALIGNED_IO)

lazy val gdsSpillAlignmentThreshold: Long = get(GDS_SPILL_ALIGNMENT_THRESHOLD)

lazy val hasNans: Boolean = get(HAS_NANS)

lazy val gpuTargetBatchSizeBytes: Long = get(GPU_BATCH_SIZE_BYTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,30 @@ import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId}

/** A buffer store using GPUDirect Storage (GDS). */
/**
* A buffer store using GPUDirect Storage (GDS).
*
* GDS is more efficient when IO is aligned.
*
* An IO is unaligned if one of the following conditions is true:
* - The file_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned.
* - The size that was issued in cuFileRead/cuFileWrite is not 4K aligned.
* - The devPtr_base that was issued in cuFileRead/cuFileWrite is not 4K aligned.
* - The devPtr_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned.
*
* To avoid unaligned IO, when GDS spilling is enabled, the RMM `aligned_resource_adapter` is used
* so that large buffers above certain size threshold are allocated with 4K aligned base pointer
* and size.
*
* When reading and writing these large buffers through GDS, the size is aligned up to the next 4K
* boundary. Although the aligned size appears to be out of bound, the extra space needed is held
* in reserve by the RMM `aligned_resource_adapter`.
*/
class RapidsGdsStore(
diskBlockManager: RapidsDiskBlockManager,
batchWriteBufferSize: Long,
alignedIO: Boolean,
alignmentThreshold: Long,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends RapidsBufferStore(StorageTier.GDS, catalog) with Arm {
private[this] val batchSpiller = new BatchSpiller()
Expand All @@ -51,6 +71,23 @@ class RapidsGdsStore(
}
}

private def alignBufferSize(buffer: DeviceMemoryBuffer): Long = {
if (alignedIO) {
RapidsGdsStore.alignUp(buffer.getLength)
} else {
buffer.getLength
}
}

private def alignLargeBufferSize(buffer: DeviceMemoryBuffer): Long = {
val length = buffer.getLength
if (alignedIO && length >= alignmentThreshold) {
RapidsGdsStore.alignUp(length)
} else {
length
}
}

abstract class RapidsGdsBuffer(
override val id: RapidsBufferId,
override val size: Long,
Expand All @@ -70,7 +107,8 @@ class RapidsGdsStore(

override def materializeMemoryBuffer: MemoryBuffer = {
closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer =>
CuFile.readFileToDeviceBuffer(buffer, path, fileOffset)
CuFile.readFileToDeviceMemory(
buffer.getAddress, alignLargeBufferSize(buffer), path, fileOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
buffer
}
Expand All @@ -83,8 +121,10 @@ class RapidsGdsStore(
val dm = dmOriginal.slice(dstOffset, length)
// TODO: switch to async API when it's released, using the passed in CUDA stream.
stream.sync()
// TODO: align the reads once https://github.com/NVIDIA/spark-rapids/issues/2492 is
// resolved.
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS")
case _ => throw new IllegalStateException(
s"GDS can only copy to device buffer, not ${dst.getClass}")
}
Expand All @@ -102,17 +142,18 @@ class RapidsGdsStore(
}

private def singleShotSpill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer)
: RapidsBufferBase = {
: RapidsBufferBase = {
val id = other.id
val path = id.getDiskPath(diskBlockManager)
val alignedSize = alignLargeBufferSize(deviceBuffer)
// When sharing files, append to the file; otherwise, write from the beginning.
val fileOffset = if (id.canShareDiskPaths) {
// only one writer at a time for now when using shared files
path.synchronized {
CuFile.appendDeviceBufferToFile(path, deviceBuffer)
CuFile.appendDeviceMemoryToFile(path, deviceBuffer.getAddress, alignedSize)
}
} else {
CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer)
CuFile.writeDeviceMemoryToFile(path, 0, deviceBuffer.getAddress, alignedSize)
0
}
logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS")
Expand All @@ -121,7 +162,6 @@ class RapidsGdsStore(
}

class BatchSpiller() {
private val blockSize = 4096
private[this] val spilledBuffers = new ConcurrentHashMap[File, Set[RapidsBufferId]]
private[this] val pendingBuffers = ArrayBuffer.empty[RapidsGdsBatchedBuffer]
private[this] val batchWriteBuffer = CuFileBuffer.allocate(batchWriteBufferSize, true)
Expand All @@ -130,10 +170,11 @@ class RapidsGdsStore(

def spill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer): RapidsBufferBase =
this.synchronized {
if (deviceBuffer.getLength > batchWriteBufferSize - currentOffset) {
val alignedSize = alignBufferSize(deviceBuffer)
if (alignedSize > batchWriteBufferSize - currentOffset) {
val path = currentFile.getAbsolutePath
withResource(new CuFileWriteHandle(path)) { handle =>
handle.write(batchWriteBuffer, currentOffset, 0)
handle.write(batchWriteBuffer, batchWriteBufferSize, 0)
logDebug(s"Spilled to $path 0:$currentOffset via GDS")
}
pendingBuffers.foreach(_.unsetPending())
Expand All @@ -149,15 +190,11 @@ class RapidsGdsStore(
addBuffer(currentFile, id)
val gdsBuffer = new RapidsGdsBatchedBuffer(id, currentFile, currentOffset,
other.size, other.meta, other.getSpillPriority, other.spillCallback)
currentOffset += alignUp(deviceBuffer.getLength)
currentOffset += alignedSize
pendingBuffers += gdsBuffer
gdsBuffer
}

private def alignUp(length: Long): Long = {
(length + blockSize - 1) & ~(blockSize - 1)
}

private def copyToBuffer(
buffer: MemoryBuffer, offset: Long, size: Long, stream: Cuda.Stream): Unit = {
buffer.copyFromMemoryBuffer(0, batchWriteBuffer, offset, size, stream)
Expand Down Expand Up @@ -208,7 +245,8 @@ class RapidsGdsStore(
Cuda.DEFAULT_STREAM.sync()
logDebug(s"Created device buffer $size from batch write buffer")
} else {
CuFile.readFileToDeviceBuffer(buffer, path, fileOffset)
CuFile.readFileToDeviceMemory(
buffer.getAddress, alignLargeBufferSize(buffer), path, fileOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
}
buffer
Expand All @@ -227,8 +265,10 @@ class RapidsGdsStore(
} else {
// TODO: switch to async API when it's released, using the passed in CUDA stream.
stream.sync()
// TODO: align the reads once https://github.com/NVIDIA/spark-rapids/issues/2492 is
// resolved.
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
logDebug(s"Created device buffer for $path ${fileOffset + srcOffset}:$length via GDS")
}
case _ => throw new IllegalStateException(
s"GDS can only copy to device buffer, not ${dst.getClass}")
Expand All @@ -252,4 +292,12 @@ class RapidsGdsStore(
}
}
}
}
}

object RapidsGdsStore {
val AllocationAlignment = 4096L

def alignUp(length: Long): Long = {
(length + AllocationAlignment - 1) & ~(AllocationAlignment - 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with
val batchWriteBufferSize = 16384 // Holds 2 buffers.
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsGdsStore(
diskBlockManager, batchWriteBufferSize, catalog)) { gdsStore =>
diskBlockManager, batchWriteBufferSize, false, 65536, catalog)) { gdsStore =>

devStore.setSpillStore(gdsStore)
assertResult(0)(gdsStore.currentSize)
Expand Down Expand Up @@ -111,7 +111,8 @@ class RapidsGdsStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with
val spillPriority = -7
val catalog = spy(new RapidsBufferCatalog)
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, catalog)) { gdsStore =>
withResource(new RapidsGdsStore(mock[RapidsDiskBlockManager], 4096, false, 65536, catalog)) {
gdsStore =>
devStore.setSpillStore(gdsStore)
assertResult(0)(gdsStore.currentSize)
val bufferSize = addTableToStore(devStore, bufferId, spillPriority)
Expand Down

0 comments on commit f70dbcb

Please sign in to comment.