Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make tables spillable by default #8264

Merged
merged 11 commits into from
May 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ object GpuDeviceManager extends Logging {
java.lang.Boolean.getBoolean("com.nvidia.spark.rapids.memory.gpu.rmm.init.task")
}

// Memory resource used only for cudf::chunked_pack to allocate scratch space
// during spill to host. This is done to set aside some memory for this operation
// from the beginning of the job.
var chunkedPackMemoryResource: Option[RmmPoolMemoryResource[RmmCudaMemoryResource]] = None

// for testing only
def setRmmTaskInitEnabled(enabled: Boolean): Unit = {
rmmTaskInitEnabled = enabled
Expand Down Expand Up @@ -141,6 +146,10 @@ object GpuDeviceManager extends Logging {
def shutdown(): Unit = synchronized {
// assume error during shutdown until we complete it
singletonMemoryInitialized = Errored

chunkedPackMemoryResource.foreach(_.close)
chunkedPackMemoryResource = None

RapidsBufferCatalog.close()
GpuShuffleEnv.shutdown()
// try to avoid segfault on RMM shutdown
Expand Down Expand Up @@ -246,8 +255,21 @@ object GpuDeviceManager extends Logging {
private def initializeRmm(gpuId: Int, rapidsConf: Option[RapidsConf]): Unit = {
if (!Rmm.isInitialized) {
val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf))
val info = Cuda.memGetInfo()

val poolSize = conf.chunkedPackPoolSize
chunkedPackMemoryResource =
if (poolSize > 0) {
val chunkedPackPool =
new RmmPoolMemoryResource(new RmmCudaMemoryResource(), poolSize, poolSize)
logDebug(
s"Initialized pool resource for spill operations " +
s"of ${chunkedPackMemoryResource.map(_.getMaxSize)} Bytes")
Some(chunkedPackPool)
} else {
None
}

val info = Cuda.memGetInfo()
val poolAllocation = computeRmmPoolSize(conf, info)
var init = RmmAllocationMode.CUDA_DEFAULT
val features = ArrayBuffer[String]()
Expand Down
194 changes: 190 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package com.nvidia.spark.rapids

import java.io.File

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.RapidsDiskBlockManager
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -59,16 +63,190 @@ object StorageTier extends Enumeration {
val GDS: StorageTier = Value(3, "GPUDirect Storage")
}

/**
* ChunkedPacker is an Iterator that uses a cudf::chunked_pack to copy a cuDF `Table`
* to a target buffer in chunks.
*
* Each chunk is sized at most `bounceBuffer.getLength`, and the caller should cudaMemcpy
* bytes from `bounceBuffer` to a target buffer after each call to `next()`.
*
* @note `ChunkedPacker` must be closed by the caller as it has GPU and host resources
* associated with it.
*
* @param id The RapidsBufferId for this pack operation to be included in the metadata
* @param table cuDF Table to chunk_pack
* @param bounceBuffer GPU memory to be used for packing. The buffer should be at least 1MB
* in length.
*/
class ChunkedPacker(
id: RapidsBufferId,
table: Table,
bounceBuffer: DeviceMemoryBuffer)
extends Iterator[MemoryBuffer]
with Logging
with AutoCloseable {

private var closed: Boolean = false

// When creating cudf::chunked_pack use a pool if available, otherwise default to the
// per-device memory resource
private val chunkedPack = {
val pool = GpuDeviceManager.chunkedPackMemoryResource
val cudfChunkedPack = try {
pool.flatMap { chunkedPool =>
Some(table.makeChunkedPack(bounceBuffer.getLength, chunkedPool))
}
} catch {
case _: OutOfMemoryError =>
if (!ChunkedPacker.warnedAboutPoolFallback) {
ChunkedPacker.warnedAboutPoolFallback = true
logWarning(
s"OOM while creating chunked_pack using pool sized ${pool.map(_.getMaxSize)}B. " +
"Falling back to the per-device memory resource.")
}
None
}

// if the pool is not configured, or we got an OOM, try again with the per-device pool
cudfChunkedPack.getOrElse {
table.makeChunkedPack(bounceBuffer.getLength)
}
}

private val tableMeta = withResource(chunkedPack.buildMetadata()) { packedMeta =>
MetaUtils.buildTableMeta(
id.tableId,
chunkedPack.getTotalContiguousSize,
packedMeta.getMetadataDirectBuffer,
table.getRowCount)
}

// take out a lease on the bounce buffer
bounceBuffer.incRefCount()

def getTotalContiguousSize: Long = chunkedPack.getTotalContiguousSize

def getMeta: TableMeta = {
tableMeta
}

override def hasNext: Boolean = {
!closed && chunkedPack.hasNext
}

def next(): MemoryBuffer = {
val bytesWritten = chunkedPack.next(bounceBuffer)
// we increment the refcount because the caller has no idea where
// this memory came from, so it should close it.
bounceBuffer.slice(0, bytesWritten)
}

override def close(): Unit = synchronized {
if (!closed) {
closed = true
val toClose = new ArrayBuffer[AutoCloseable]()
toClose.append(chunkedPack, bounceBuffer)
toClose.safeClose()
}
}
}

object ChunkedPacker {
private var warnedAboutPoolFallback: Boolean = false
}

/**
* This iterator encapsulates a buffer's internal `MemoryBuffer` access
* for spill reasons. Internally, there are two known implementations:
* - either this is a "single shot" copy, where the entirety of the `RapidsBuffer` is
* already represented as a single contiguous blob of memory, then the expectation
* is that this iterator is exhausted with a single call to `next`
* - or, we have a `RapidsBuffer` that isn't contiguous. This iteration will then
* drive a `ChunkedPacker` to pack the `RapidsBuffer`'s table as needed. The
* iterator will likely need several calls to `next` to be exhausted.
*
* @param buffer `RapidsBuffer` to copy out of its tier.
*/
class RapidsBufferCopyIterator(buffer: RapidsBuffer)
extends Iterator[MemoryBuffer] with AutoCloseable with Logging {

private val chunkedPacker: Option[ChunkedPacker] = if (buffer.supportsChunkedPacker) {
Some(buffer.getChunkedPacker)
} else {
None
}

def isChunked: Boolean = chunkedPacker.isDefined

// this is used for the single shot case to flag when `next` is call
// to satisfy the Iterator interface
private var singleShotCopyHasNext: Boolean = false
private var singleShotBuffer: MemoryBuffer = _

if (!isChunked) {
singleShotCopyHasNext = true
singleShotBuffer = buffer.getMemoryBuffer
}

override def hasNext: Boolean =
chunkedPacker.map(_.hasNext).getOrElse(singleShotCopyHasNext)

override def next(): MemoryBuffer = {
require(hasNext,
"next called on exhausted iterator")
chunkedPacker.map(_.next()).getOrElse {
singleShotCopyHasNext = false
singleShotBuffer.slice(0, singleShotBuffer.getLength)
}
}

def getTotalCopySize: Long = {
chunkedPacker
.map(_.getTotalContiguousSize)
.getOrElse(singleShotBuffer.getLength)
}

override def close(): Unit = {
val hasNextBeforeClose = hasNext
val toClose = new ArrayBuffer[AutoCloseable]()
toClose.appendAll(chunkedPacker)
toClose.appendAll(Option(singleShotBuffer))

toClose.safeClose()
require(!hasNextBeforeClose,
"RapidsBufferCopyIterator was closed before exhausting")
}
}

/** Interface provided by all types of RAPIDS buffers */
trait RapidsBuffer extends AutoCloseable {
/** The buffer identifier for this buffer. */
val id: RapidsBufferId

/** The size of this buffer in bytes. */
val size: Long
/**
* The size of this buffer in bytes in its _current_ store. As the buffer goes through
* contiguous split (either added as a contiguous table already, or spilled to host),
* its size changes because contiguous_split adds its own alignment padding.
*
* @note Do not use this size to allocate a target buffer to copy, always use `getPackedSize.`
*/
def getMemoryUsedBytes: Long

/**
* The size of this buffer if it has already gone through contiguous_split.
*
* @note Use this function when allocating a target buffer for spill or shuffle purposes.
*/
def getPackedSizeBytes: Long = getMemoryUsedBytes

/**
* At spill time, obtain an iterator used to copy this buffer to a different tier.
*/
def getCopyIterator: RapidsBufferCopyIterator =
new RapidsBufferCopyIterator(this)

/** Descriptor for how the memory buffer is formatted */
val meta: TableMeta
def meta: TableMeta

/** The storage tier for this buffer */
val storageTier: StorageTier
Expand All @@ -94,6 +272,12 @@ trait RapidsBuffer extends AutoCloseable {
*/
def getMemoryBuffer: MemoryBuffer

val supportsChunkedPacker: Boolean = false

def getChunkedPacker: ChunkedPacker = {
throw new NotImplementedError("not implemented for this store")
}

/**
* Copy the content of this buffer into the specified memory buffer, starting from the given
* offset.
Expand Down Expand Up @@ -163,7 +347,9 @@ trait RapidsBuffer extends AutoCloseable {
sealed class DegenerateRapidsBuffer(
override val id: RapidsBufferId,
override val meta: TableMeta) extends RapidsBuffer {
override val size: Long = 0L

override def getMemoryUsedBytes: Long = 0L

override val storageTier: StorageTier = StorageTier.DEVICE

override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = {
Expand Down
Loading