Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

align GDS reads/writes to 4 KiB #2460

Merged
merged 16 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ object GpuDeviceManager extends Logging {
}
}

private def toKB(x: Long): Double = x / 1024.0

private def toMB(x: Long): Double = x / 1024 / 1024.0

private def computeRmmInitSizes(conf: RapidsConf, info: CudaMemInfo): (Long, Long) = {
Expand Down Expand Up @@ -263,9 +265,18 @@ object GpuDeviceManager extends Logging {
logInfo("Using legacy default stream")
}

val (allocationAlignment, alignmentThreshold) = if (conf.isGdsSpillEnabled) {
logInfo(s"Using allocation alignment = ${toKB(RapidsGdsStore.AllocationAlignment)} KB, " +
s"alignment threshold = ${toKB(RapidsGdsStore.AlignmentThreshold)} KB")
(RapidsGdsStore.AllocationAlignment, RapidsGdsStore.AlignmentThreshold)
} else {
(0L, 0L)
}

try {
Cuda.setDevice(gpuId)
Rmm.initialize(init, logConf, initialAllocation, maxAllocation)
Rmm.initialize(
init, logConf, initialAllocation, maxAllocation, allocationAlignment, alignmentThreshold)
RapidsBufferCatalog.init(conf)
GpuShuffleEnv.init(conf)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,25 @@ import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId}

/** A buffer store using GPUDirect Storage (GDS). */
/**
* A buffer store using GPUDirect Storage (GDS).
*
* GDS is more efficient when IO is aligned.
*
* An IO is unaligned if one of the following conditions is true:
* - The file_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned.
* - The size that was issued in cuFileRead/cuFileWrite is not 4K aligned.
* - The devPtr_base that was issued in cuFileRead/cuFileWrite is not 4K aligned.
* - The devPtr_offset that was issued in cuFileRead/cuFileWrite is not 4K aligned.
*
* To avoid unaligned IO, when GDS spilling is enabled, the RMM `aligned_resource_adapter` is used
* so that large buffers above certain size threshold are allocated with 4K aligned base pointer
* and size.
*
* When reading and writing these large buffers through GDS, the size is aligned up to the next 4K
* boundary. Although the aligned size appears to be out of bound, the extra space needed is held
* in reserve by the RMM `aligned_resource_adapter`.
*/
class RapidsGdsStore(
diskBlockManager: RapidsDiskBlockManager,
batchWriteBufferSize: Long,
Expand Down Expand Up @@ -70,7 +88,8 @@ class RapidsGdsStore(

override def materializeMemoryBuffer: MemoryBuffer = {
closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer =>
CuFile.readFileToDeviceBuffer(buffer, path, fileOffset)
CuFile.readFileToDeviceMemory(
buffer.getAddress, RapidsGdsStore.alignBufferSize(buffer), path, fileOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
buffer
}
Expand All @@ -83,7 +102,8 @@ class RapidsGdsStore(
val dm = dmOriginal.slice(dstOffset, length)
// TODO: switch to async API when it's released, using the passed in CUDA stream.
stream.sync()
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
CuFile.readFileToDeviceMemory(
dm.getAddress, RapidsGdsStore.alignBufferSize(dm), path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
case _ => throw new IllegalStateException(
s"GDS can only copy to device buffer, not ${dst.getClass}")
Expand All @@ -102,17 +122,18 @@ class RapidsGdsStore(
}

private def singleShotSpill(other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer)
: RapidsBufferBase = {
: RapidsBufferBase = {
val id = other.id
val path = id.getDiskPath(diskBlockManager)
val alignedSize = RapidsGdsStore.alignBufferSize(deviceBuffer)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// When sharing files, append to the file; otherwise, write from the beginning.
val fileOffset = if (id.canShareDiskPaths) {
// only one writer at a time for now when using shared files
path.synchronized {
CuFile.appendDeviceBufferToFile(path, deviceBuffer)
CuFile.appendDeviceMemoryToFile(path, deviceBuffer.getAddress, alignedSize)
}
} else {
CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer)
CuFile.writeDeviceMemoryToFile(path, 0, deviceBuffer.getAddress, alignedSize)
0
}
logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS")
Expand All @@ -121,7 +142,6 @@ class RapidsGdsStore(
}

class BatchSpiller() {
private val blockSize = 4096
private[this] val spilledBuffers = new ConcurrentHashMap[File, Set[RapidsBufferId]]
private[this] val pendingBuffers = ArrayBuffer.empty[RapidsGdsBatchedBuffer]
private[this] val batchWriteBuffer = CuFileBuffer.allocate(batchWriteBufferSize, true)
Expand Down Expand Up @@ -149,15 +169,11 @@ class RapidsGdsStore(
addBuffer(currentFile, id)
val gdsBuffer = new RapidsGdsBatchedBuffer(id, currentFile, currentOffset,
other.size, other.meta, other.getSpillPriority, other.spillCallback)
currentOffset += alignUp(deviceBuffer.getLength)
currentOffset += RapidsGdsStore.alignUp(deviceBuffer.getLength)
pendingBuffers += gdsBuffer
gdsBuffer
}

private def alignUp(length: Long): Long = {
(length + blockSize - 1) & ~(blockSize - 1)
}

private def copyToBuffer(
buffer: MemoryBuffer, offset: Long, size: Long, stream: Cuda.Stream): Unit = {
buffer.copyFromMemoryBuffer(0, batchWriteBuffer, offset, size, stream)
Expand Down Expand Up @@ -208,7 +224,8 @@ class RapidsGdsStore(
Cuda.DEFAULT_STREAM.sync()
logDebug(s"Created device buffer $size from batch write buffer")
} else {
CuFile.readFileToDeviceBuffer(buffer, path, fileOffset)
CuFile.readFileToDeviceMemory(
buffer.getAddress, RapidsGdsStore.alignBufferSize(buffer), path, fileOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
}
buffer
Expand All @@ -227,7 +244,8 @@ class RapidsGdsStore(
} else {
// TODO: switch to async API when it's released, using the passed in CUDA stream.
stream.sync()
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
CuFile.readFileToDeviceMemory(
dm.getAddress, RapidsGdsStore.alignBufferSize(dm), path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
}
case _ => throw new IllegalStateException(
Expand All @@ -252,4 +270,22 @@ class RapidsGdsStore(
}
}
}
}
}

object RapidsGdsStore {
val AllocationAlignment = 4096L
val AlignmentThreshold = 65536L

def alignUp(length: Long): Long = {
(length + AllocationAlignment - 1) & ~(AllocationAlignment - 1)
}

def alignBufferSize(buffer: DeviceMemoryBuffer): Long = {
val length = buffer.getLength
if (length < AlignmentThreshold) {
length
} else {
alignUp(length)
}
}
}