From 6bd9768b14fb6c3118a7d0149121981e6952fe0e Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 23 May 2023 16:18:38 -0500 Subject: [PATCH] Make tables spillable by default (#8264) * Make tables spillable by default --------- Signed-off-by: Alessandro Bellina --- .../spark/rapids/GpuDeviceManager.scala | 24 +- .../nvidia/spark/rapids/RapidsBuffer.scala | 194 ++++++++++- .../spark/rapids/RapidsBufferCatalog.scala | 78 ++++- .../spark/rapids/RapidsBufferStore.scala | 41 +-- .../com/nvidia/spark/rapids/RapidsConf.scala | 24 ++ .../rapids/RapidsDeviceMemoryStore.scala | 327 ++++++++++++++++-- .../nvidia/spark/rapids/RapidsDiskStore.scala | 14 +- .../nvidia/spark/rapids/RapidsGdsStore.scala | 22 +- .../spark/rapids/RapidsHostMemoryStore.scala | 78 +++-- .../spark/rapids/SpillableColumnarBatch.scala | 11 +- .../rapids/shuffle/BufferSendState.scala | 2 +- .../shuffle/RapidsShuffleIterator.scala | 2 +- .../com/nvidia/spark/rapids/CastOpSuite.scala | 9 +- .../rapids/RapidsBufferCatalogSuite.scala | 60 ++-- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 192 +++++++++- .../spark/rapids/RapidsDiskStoreSuite.scala | 2 +- .../spark/rapids/RapidsGdsStoreSuite.scala | 4 +- .../rapids/RapidsHostMemoryStoreSuite.scala | 15 +- .../spark/rapids/RmmSparkRetrySuiteBase.scala | 1 + .../nvidia/spark/rapids/WithRetrySuite.scala | 1 + .../shuffle/RapidsShuffleIteratorSuite.scala | 2 +- .../shuffle/RapidsShuffleServerSuite.scala | 8 +- .../rapids/SpillableColumnarBatchSuite.scala | 4 +- 23 files changed, 933 insertions(+), 182 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index 43f2bea56f7..6e3e818e29b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -42,6 +42,11 @@ object GpuDeviceManager extends Logging { java.lang.Boolean.getBoolean("com.nvidia.spark.rapids.memory.gpu.rmm.init.task") } + // Memory resource used only for cudf::chunked_pack to allocate scratch space + // during spill to host. This is done to set aside some memory for this operation + // from the beginning of the job. + var chunkedPackMemoryResource: Option[RmmPoolMemoryResource[RmmCudaMemoryResource]] = None + // for testing only def setRmmTaskInitEnabled(enabled: Boolean): Unit = { rmmTaskInitEnabled = enabled @@ -141,6 +146,10 @@ object GpuDeviceManager extends Logging { def shutdown(): Unit = synchronized { // assume error during shutdown until we complete it singletonMemoryInitialized = Errored + + chunkedPackMemoryResource.foreach(_.close) + chunkedPackMemoryResource = None + RapidsBufferCatalog.close() GpuShuffleEnv.shutdown() // try to avoid segfault on RMM shutdown @@ -246,8 +255,21 @@ object GpuDeviceManager extends Logging { private def initializeRmm(gpuId: Int, rapidsConf: Option[RapidsConf]): Unit = { if (!Rmm.isInitialized) { val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf)) - val info = Cuda.memGetInfo() + val poolSize = conf.chunkedPackPoolSize + chunkedPackMemoryResource = + if (poolSize > 0) { + val chunkedPackPool = + new RmmPoolMemoryResource(new RmmCudaMemoryResource(), poolSize, poolSize) + logDebug( + s"Initialized pool resource for spill operations " + + s"of ${chunkedPackMemoryResource.map(_.getMaxSize)} Bytes") + Some(chunkedPackPool) + } else { + None + } + + val info = Cuda.memGetInfo() val poolAllocation = computeRmmPoolSize(conf, info) var init = RmmAllocationMode.CUDA_DEFAULT val features = ArrayBuffer[String]() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 61394b47850..b53a6e3a6fb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -18,11 +18,15 @@ package com.nvidia.spark.rapids import java.io.File +import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.RapidsDiskBlockManager import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -59,16 +63,190 @@ object StorageTier extends Enumeration { val GDS: StorageTier = Value(3, "GPUDirect Storage") } +/** + * ChunkedPacker is an Iterator that uses a cudf::chunked_pack to copy a cuDF `Table` + * to a target buffer in chunks. + * + * Each chunk is sized at most `bounceBuffer.getLength`, and the caller should cudaMemcpy + * bytes from `bounceBuffer` to a target buffer after each call to `next()`. + * + * @note `ChunkedPacker` must be closed by the caller as it has GPU and host resources + * associated with it. + * + * @param id The RapidsBufferId for this pack operation to be included in the metadata + * @param table cuDF Table to chunk_pack + * @param bounceBuffer GPU memory to be used for packing. The buffer should be at least 1MB + * in length. + */ +class ChunkedPacker( + id: RapidsBufferId, + table: Table, + bounceBuffer: DeviceMemoryBuffer) + extends Iterator[MemoryBuffer] + with Logging + with AutoCloseable { + + private var closed: Boolean = false + + // When creating cudf::chunked_pack use a pool if available, otherwise default to the + // per-device memory resource + private val chunkedPack = { + val pool = GpuDeviceManager.chunkedPackMemoryResource + val cudfChunkedPack = try { + pool.flatMap { chunkedPool => + Some(table.makeChunkedPack(bounceBuffer.getLength, chunkedPool)) + } + } catch { + case _: OutOfMemoryError => + if (!ChunkedPacker.warnedAboutPoolFallback) { + ChunkedPacker.warnedAboutPoolFallback = true + logWarning( + s"OOM while creating chunked_pack using pool sized ${pool.map(_.getMaxSize)}B. " + + "Falling back to the per-device memory resource.") + } + None + } + + // if the pool is not configured, or we got an OOM, try again with the per-device pool + cudfChunkedPack.getOrElse { + table.makeChunkedPack(bounceBuffer.getLength) + } + } + + private val tableMeta = withResource(chunkedPack.buildMetadata()) { packedMeta => + MetaUtils.buildTableMeta( + id.tableId, + chunkedPack.getTotalContiguousSize, + packedMeta.getMetadataDirectBuffer, + table.getRowCount) + } + + // take out a lease on the bounce buffer + bounceBuffer.incRefCount() + + def getTotalContiguousSize: Long = chunkedPack.getTotalContiguousSize + + def getMeta: TableMeta = { + tableMeta + } + + override def hasNext: Boolean = { + !closed && chunkedPack.hasNext + } + + def next(): MemoryBuffer = { + val bytesWritten = chunkedPack.next(bounceBuffer) + // we increment the refcount because the caller has no idea where + // this memory came from, so it should close it. + bounceBuffer.slice(0, bytesWritten) + } + + override def close(): Unit = synchronized { + if (!closed) { + closed = true + val toClose = new ArrayBuffer[AutoCloseable]() + toClose.append(chunkedPack, bounceBuffer) + toClose.safeClose() + } + } +} + +object ChunkedPacker { + private var warnedAboutPoolFallback: Boolean = false +} + +/** + * This iterator encapsulates a buffer's internal `MemoryBuffer` access + * for spill reasons. Internally, there are two known implementations: + * - either this is a "single shot" copy, where the entirety of the `RapidsBuffer` is + * already represented as a single contiguous blob of memory, then the expectation + * is that this iterator is exhausted with a single call to `next` + * - or, we have a `RapidsBuffer` that isn't contiguous. This iteration will then + * drive a `ChunkedPacker` to pack the `RapidsBuffer`'s table as needed. The + * iterator will likely need several calls to `next` to be exhausted. + * + * @param buffer `RapidsBuffer` to copy out of its tier. + */ +class RapidsBufferCopyIterator(buffer: RapidsBuffer) + extends Iterator[MemoryBuffer] with AutoCloseable with Logging { + + private val chunkedPacker: Option[ChunkedPacker] = if (buffer.supportsChunkedPacker) { + Some(buffer.getChunkedPacker) + } else { + None + } + + def isChunked: Boolean = chunkedPacker.isDefined + + // this is used for the single shot case to flag when `next` is call + // to satisfy the Iterator interface + private var singleShotCopyHasNext: Boolean = false + private var singleShotBuffer: MemoryBuffer = _ + + if (!isChunked) { + singleShotCopyHasNext = true + singleShotBuffer = buffer.getMemoryBuffer + } + + override def hasNext: Boolean = + chunkedPacker.map(_.hasNext).getOrElse(singleShotCopyHasNext) + + override def next(): MemoryBuffer = { + require(hasNext, + "next called on exhausted iterator") + chunkedPacker.map(_.next()).getOrElse { + singleShotCopyHasNext = false + singleShotBuffer.slice(0, singleShotBuffer.getLength) + } + } + + def getTotalCopySize: Long = { + chunkedPacker + .map(_.getTotalContiguousSize) + .getOrElse(singleShotBuffer.getLength) + } + + override def close(): Unit = { + val hasNextBeforeClose = hasNext + val toClose = new ArrayBuffer[AutoCloseable]() + toClose.appendAll(chunkedPacker) + toClose.appendAll(Option(singleShotBuffer)) + + toClose.safeClose() + require(!hasNextBeforeClose, + "RapidsBufferCopyIterator was closed before exhausting") + } +} + /** Interface provided by all types of RAPIDS buffers */ trait RapidsBuffer extends AutoCloseable { /** The buffer identifier for this buffer. */ val id: RapidsBufferId - /** The size of this buffer in bytes. */ - val size: Long + /** + * The size of this buffer in bytes in its _current_ store. As the buffer goes through + * contiguous split (either added as a contiguous table already, or spilled to host), + * its size changes because contiguous_split adds its own alignment padding. + * + * @note Do not use this size to allocate a target buffer to copy, always use `getPackedSize.` + */ + def getMemoryUsedBytes: Long + + /** + * The size of this buffer if it has already gone through contiguous_split. + * + * @note Use this function when allocating a target buffer for spill or shuffle purposes. + */ + def getPackedSizeBytes: Long = getMemoryUsedBytes + + /** + * At spill time, obtain an iterator used to copy this buffer to a different tier. + */ + def getCopyIterator: RapidsBufferCopyIterator = + new RapidsBufferCopyIterator(this) /** Descriptor for how the memory buffer is formatted */ - val meta: TableMeta + def meta: TableMeta /** The storage tier for this buffer */ val storageTier: StorageTier @@ -94,6 +272,12 @@ trait RapidsBuffer extends AutoCloseable { */ def getMemoryBuffer: MemoryBuffer + val supportsChunkedPacker: Boolean = false + + def getChunkedPacker: ChunkedPacker = { + throw new NotImplementedError("not implemented for this store") + } + /** * Copy the content of this buffer into the specified memory buffer, starting from the given * offset. @@ -163,7 +347,9 @@ trait RapidsBuffer extends AutoCloseable { sealed class DegenerateRapidsBuffer( override val id: RapidsBufferId, override val meta: TableMeta) extends RapidsBuffer { - override val size: Long = 0L + + override def getMemoryUsedBytes: Long = 0L + override val storageTier: StorageTier = StorageTier.DEVICE override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index ad0db09da7d..aec7e7ca22e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids import java.util.concurrent.ConcurrentHashMap import java.util.function.BiFunction -import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange, Rmm} -import com.nvidia.spark.rapids.Arm.withResource +import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, NvtxColor, NvtxRange, Rmm, Table} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsBufferCatalog.getExistingRapidsBufferAndAcquire import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.StorageTier.StorageTier @@ -31,6 +31,7 @@ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId} import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.vectorized.ColumnarBatch /** * Exception thrown when inserting a buffer into the catalog with a duplicate buffer ID @@ -308,7 +309,6 @@ class RapidsBufferCatalog( tableMeta: TableMeta, initialSpillPriority: Long, needsSync: Boolean): RapidsBufferHandle = synchronized { - logDebug(s"Adding buffer ${id} to ${deviceStorage}") val rapidsBuffer = deviceStorage.addBuffer( id, buffer, @@ -319,6 +319,52 @@ class RapidsBufferCatalog( makeNewHandle(id, initialSpillPriority) } + /** + * Adds a batch to the device storage. This does NOT take ownership of the + * batch, so it is the responsibility of the caller to close it. + * + * @param batch batch that will be added to the store + * @param initialSpillPriority starting spill priority value for the batch + * @param needsSync whether the spill framework should stream synchronize while adding + * this batch (defaults to true) + * @return RapidsBufferHandle handle for this RapidsBuffer + */ + def addBatch( + batch: ColumnarBatch, + initialSpillPriority: Long, + needsSync: Boolean = true): RapidsBufferHandle = { + closeOnExcept(GpuColumnVector.from(batch)) { table => + addTable(table, initialSpillPriority, needsSync) + } + } + + /** + * Adds a table to the device storage. + * + * This takes ownership of the table. The reason for this is that tables + * don't have a reference count, so we cannot cleanly capture ownership by increasing + * ref count and decreasing from the caller. + * + * @param table table that will be owned by the store + * @param initialSpillPriority starting spill priority value + * @param needsSync whether the spill framework should stream synchronize while adding + * this table (defaults to true) + * @return RapidsBufferHandle handle for this RapidsBuffer + */ + def addTable( + table: Table, + initialSpillPriority: Long, + needsSync: Boolean = true): RapidsBufferHandle = { + val id = TempSpillBufferId() + val rapidsBuffer = deviceStorage.addTable( + id, + table, + initialSpillPriority, + needsSync) + registerNewBuffer(rapidsBuffer) + makeNewHandle(id, initialSpillPriority) + } + /** * Register a degenerate RapidsBufferId given a TableMeta * @note this is called from the shuffle catalogs only @@ -482,7 +528,7 @@ class RapidsBufferCatalog( if (nextSpillable != null) { // we have a buffer (nextSpillable) to spill spillAndFreeBuffer(nextSpillable, spillStore, stream) - totalSpilled += nextSpillable.size + totalSpilled += nextSpillable.getMemoryUsedBytes } } else { rmmShouldRetryAlloc = true @@ -527,7 +573,7 @@ class RapidsBufferCatalog( trySpillToMaximumSize(buffer, spillStore, stream) // copy the buffer to spillStore - val newBuffer = spillStore.copyBuffer(buffer, buffer.getMemoryBuffer, stream) + val newBuffer = spillStore.copyBuffer(buffer, stream) // once spilled, we get back a new RapidsBuffer instance in this new tier registerNewBuffer(newBuffer) @@ -555,7 +601,7 @@ class RapidsBufferCatalog( // this spillStore has a maximum size requirement (host only). We need to spill from it // in order to make room for `buffer`. val targetTotalSize = - math.max(spillStoreMaxSize.get - buffer.size, 0) + math.max(spillStoreMaxSize.get - buffer.getMemoryUsedBytes, 0) val maybeAmountSpilled = synchronousSpill(spillStore, targetTotalSize, stream) maybeAmountSpilled.foreach { amountSpilled => if (amountSpilled != 0) { @@ -570,29 +616,21 @@ class RapidsBufferCatalog( * Copies `buffer` to the `deviceStorage` store, registering a new `RapidsBuffer` in * the process * @param buffer - buffer to copy - * @param memoryBuffer - cuDF MemoryBuffer to copy from * @param stream - Cuda.Stream to synchronize on * @return - The `RapidsBuffer` instance that was added to the device store. */ def unspillBufferToDeviceStore( buffer: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBuffer = synchronized { // try to acquire the buffer, if it's already in the store // do not create a new one, else add a reference acquireBuffer(buffer.id, StorageTier.DEVICE) match { case None => - val newBuffer = deviceStorage.copyBuffer( - buffer, - memoryBuffer, - stream) + val newBuffer = deviceStorage.copyBuffer(buffer, stream) newBuffer.addReference() // add a reference since we are about to use it registerNewBuffer(newBuffer) newBuffer - case Some(existingBuffer) => - withResource(memoryBuffer) { _ => - existingBuffer - } + case Some(existingBuffer) => existingBuffer } } @@ -702,7 +740,7 @@ object RapidsBufferCatalog extends Logging { // We are going to re-initialize so make sure all of the old things were closed... closeImpl() assert(memoryEventHandler == null) - deviceStorage = new RapidsDeviceMemoryStore() + deviceStorage = new RapidsDeviceMemoryStore(rapidsConf.chunkedPackBounceBufferSize) diskBlockManager = new RapidsDiskBlockManager(conf) if (rapidsConf.isGdsSpillEnabled) { gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize) @@ -813,6 +851,12 @@ object RapidsBufferCatalog extends Logging { singleton.addBuffer(buffer, tableMeta, initialSpillPriority) } + def addBatch( + batch: ColumnarBatch, + initialSpillPriority: Long): RapidsBufferHandle = { + singleton.addBatch(batch, initialSpillPriority) + } + /** * Lookup the buffer that corresponds to the specified buffer handle and acquire it. * NOTE: It is the responsibility of the caller to close the buffer. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 61511f22e73..64ae09ae28a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -66,14 +66,14 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (old != null) { throw new DuplicateBufferException(s"duplicate buffer registered: ${buffer.id}") } - totalBytesStored += buffer.size + totalBytesStored += buffer.getMemoryUsedBytes // device buffers "spillability" is handled via DeviceMemoryBuffer ref counting // so spillableOnAdd should be false, all other buffer tiers are spillable at // all times. if (spillableOnAdd) { if (spillable.offer(buffer)) { - totalBytesSpillable += buffer.size + totalBytesSpillable += buffer.getMemoryUsedBytes } } } @@ -83,9 +83,9 @@ abstract class RapidsBufferStore(val tier: StorageTier) spilling.remove(id) val obj = buffers.remove(id) if (obj != null) { - totalBytesStored -= obj.size + totalBytesStored -= obj.getMemoryUsedBytes if (spillable.remove(obj)) { - totalBytesSpillable -= obj.size + totalBytesSpillable -= obj.getMemoryUsedBytes } } } @@ -119,14 +119,14 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (!spilling.contains(buffer.id) && buffers.containsKey(buffer.id)) { // try to add it to the spillable collection if (spillable.offer(buffer)) { - totalBytesSpillable += buffer.size + totalBytesSpillable += buffer.getMemoryUsedBytes logDebug(s"Buffer ${buffer.id} is spillable. " + s"total=${totalBytesStored} spillable=${totalBytesSpillable}") } // else it was already there (unlikely) } } else { if (spillable.remove(buffer)) { - totalBytesSpillable -= buffer.size + totalBytesSpillable -= buffer.getMemoryUsedBytes logDebug(s"Buffer ${buffer.id} is not spillable. " + s"total=${totalBytesStored}, spillable=${totalBytesSpillable}") } // else it was already removed @@ -138,8 +138,8 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (buffer != null) { // mark the id as "spilling" (this buffer is in the middle of a spill operation) spilling.add(buffer.id) - totalBytesSpillable -= buffer.size - logDebug(s"Spilling buffer ${buffer.id}. size=${buffer.size} " + + totalBytesSpillable -= buffer.getMemoryUsedBytes + logDebug(s"Spilling buffer ${buffer.id}. size=${buffer.getMemoryUsedBytes} " + s"total=${totalBytesStored}, new spillable=${totalBytesSpillable}") } buffer @@ -195,17 +195,13 @@ abstract class RapidsBufferStore(val tier: StorageTier) * (i.e.: this method will not take ownership of the incoming buffer object). * This does not need to update the catalog, the caller is responsible for that. * @param buffer data from another store - * @param memoryBuffer memory buffer obtained from the specified Rapids buffer. The ownership - * for `memoryBuffer` is transferred to this store. The store may close - * `memoryBuffer` if necessary. * @param stream CUDA stream to use for copy or null * @return the new buffer that was created */ def copyBuffer( buffer: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { - freeOnExcept(createBuffer(buffer, memoryBuffer, stream)) { newBuffer => + freeOnExcept(createBuffer(buffer, stream)) { newBuffer => addBuffer(newBuffer) newBuffer } @@ -227,15 +223,11 @@ abstract class RapidsBufferStore(val tier: StorageTier) * @note DO NOT close the buffer unless adding a reference! * @note `createBuffer` impls should synchronize against `stream` before returning, if needed. * @param buffer data from another store - * @param memoryBuffer memory buffer obtained from the specified Rapids buffer. The ownership - * for `memoryBuffer` is transferred to this store. The store may close - * `memoryBuffer` if necessary. * @param stream CUDA stream to use or null * @return the new buffer that was created. */ protected def createBuffer( buffer: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase /** Update bookkeeping for a new buffer */ @@ -254,8 +246,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) /** Base class for all buffers in this store. */ abstract class RapidsBufferBase( override val id: RapidsBufferId, - override val size: Long, - override val meta: TableMeta, + _meta: TableMeta, initialSpillPriority: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends RapidsBuffer { @@ -267,6 +258,8 @@ abstract class RapidsBufferStore(val tier: StorageTier) private[this] var spillPriority: Long = initialSpillPriority + def meta: TableMeta = _meta + /** Release the underlying resources for this buffer. */ protected def releaseResources(): Unit @@ -344,7 +337,6 @@ abstract class RapidsBufferStore(val tier: StorageTier) logDebug(s"Unspilling $this $id to $DEVICE") val newBuffer = catalog.unspillBufferToDeviceStore( this, - materializeMemoryBuffer, Cuda.DEFAULT_STREAM) withResource(newBuffer) { _ => return newBuffer.getDeviceMemoryBuffer @@ -360,8 +352,9 @@ abstract class RapidsBufferStore(val tier: StorageTier) materializeMemoryBuffer match { case h: HostMemoryBuffer => withResource(h) { _ => - closeOnExcept(DeviceMemoryBuffer.allocate(size)) { deviceBuffer => - logDebug(s"copying from host $h to device $deviceBuffer") + closeOnExcept(DeviceMemoryBuffer.allocate(h.getLength)) { deviceBuffer => + logDebug(s"copying ${h.getLength} from host $h to device $deviceBuffer " + + s"of size ${deviceBuffer.getLength}") deviceBuffer.copyFromHostBuffer(h) deviceBuffer } @@ -403,7 +396,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) freeBuffer() } } else { - logWarning(s"Trying to free an invalid buffer => $id, size = $size, $this") + logWarning(s"Trying to free an invalid buffer => $id, size = ${getMemoryUsedBytes}, $this") } } @@ -421,6 +414,6 @@ abstract class RapidsBufferStore(val tier: StorageTier) releaseResources() } - override def toString: String = s"$name buffer size=$size" + override def toString: String = s"$name buffer size=${getMemoryUsedBytes}" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index d68521c1c79..7e3be49648e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1856,6 +1856,26 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val CHUNKED_PACK_POOL_SIZE = conf("spark.rapids.sql.chunkedPack.poolSize") + .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + + "scratch space, needed during spill from GPU to host memory. As a rule of thumb, each " + + "column should see around 200B that will be allocated from this pool. " + + "With the default of 10MB, a table of ~60,000 columns can be spilled using only this " + + "pool. If this config is 0B, or if allocations fail, the plugin will retry with " + + "the regular GPU memory resource.") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(10L*1024*1024) + + val CHUNKED_PACK_BOUNCE_BUFFER_SIZE = conf("spark.rapids.sql.chunkedPack.bounceBufferSize") + .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + + "bounce buffer, needed during spill from GPU to host memory. ") + .internal() + .bytesConf(ByteUnit.BYTE) + .checkValue(v => v >= 1L*1024*1024, + "The chunked pack bounce buffer must be at least 1MB in size") + .createWithDefault(128L * 1024 * 1024) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -2462,6 +2482,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isAqeExchangeReuseFixupEnabled: Boolean = get(ENABLE_AQE_EXCHANGE_REUSE_FIXUP) + lazy val chunkedPackPoolSize: Long = get(CHUNKED_PACK_POOL_SIZE) + + lazy val chunkedPackBounceBufferSize: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_SIZE) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 5f1a313729f..30a2c472101 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -16,48 +16,61 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.mutable + +import ai.rapids.cudf.{ColumnVector, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.sql.rapids.TempSpillBufferId import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch /** * Buffer storage using device memory. - * @param catalog catalog to register this store + * @param chunkedPackBounceBufferSize this is the size of the bounce buffer to be used + * during spill in chunked_pack. The parameter defaults to 128MB, + * with a rule-of-thumb of 1MB per SM. */ -class RapidsDeviceMemoryStore +class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024) extends RapidsBufferStore(StorageTier.DEVICE) { // The RapidsDeviceMemoryStore handles spillability via ref counting override protected def spillableOnAdd: Boolean = false + // bounce buffer to be used during chunked pack in GPU to host memory spill + private var chunkedPackBounceBuffer: DeviceMemoryBuffer = + DeviceMemoryBuffer.allocate(chunkedPackBounceBufferSize) + override protected def createBuffer( other: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { - val deviceBuffer = { - memoryBuffer match { - case d: DeviceMemoryBuffer => d - case h: HostMemoryBuffer => - withResource(h) { _ => - closeOnExcept(DeviceMemoryBuffer.allocate(other.size)) { deviceBuffer => + val memoryBuffer = withResource(other.getCopyIterator) { copyIterator => + copyIterator.next() + } + withResource(memoryBuffer) { _ => + val deviceBuffer = { + memoryBuffer match { + case d: DeviceMemoryBuffer => d + case h: HostMemoryBuffer => + closeOnExcept(DeviceMemoryBuffer.allocate(memoryBuffer.getLength)) { deviceBuffer => logDebug(s"copying from host $h to device $deviceBuffer") deviceBuffer.copyFromHostBuffer(h, stream) deviceBuffer } - } - case b => throw new IllegalStateException(s"Unrecognized buffer: $b") + case b => throw new IllegalStateException(s"Unrecognized buffer: $b") + } } + new RapidsDeviceMemoryBuffer( + other.id, + deviceBuffer.getLength, + other.meta, + deviceBuffer, + other.getSpillPriority) } - new RapidsDeviceMemoryBuffer( - other.id, - other.size, - other.meta, - deviceBuffer, - other.getSpillPriority) } /** @@ -93,11 +106,42 @@ class RapidsDeviceMemoryStore s"uncompressed=${rapidsBuffer.meta.bufferMeta.uncompressedSize}, " + s"meta_id=${tableMeta.bufferMeta.id}, " + s"meta_size=${tableMeta.bufferMeta.size}]") - addDeviceBuffer(rapidsBuffer, needsSync) + addBuffer(rapidsBuffer, needsSync) rapidsBuffer } } + /** + * Adds a table to the device storage. + * + * This takes ownership of the table. + * + * This function is called only from the RapidsBufferCatalog, under the + * catalog lock. + * + * @param id the RapidsBufferId to use for this table + * @param table table that will be owned by the store + * @param initialSpillPriority starting spill priority value + * @param needsSync whether the spill framework should stream synchronize while adding + * this table (defaults to true) + * @return the RapidsBuffer instance that was added. + */ + def addTable( + id: TempSpillBufferId, + table: Table, + initialSpillPriority: Long, + needsSync: Boolean): RapidsBuffer = { + val rapidsTable = new RapidsTable( + id, + table, + initialSpillPriority) + freeOnExcept(rapidsTable) { _ => + addBuffer(rapidsTable, needsSync) + rapidsTable.updateSpillability() + rapidsTable + } + } + /** * Adds a device buffer to the spill framework, stream synchronizing with the producer * stream to ensure that the buffer is fully materialized, and can be safely copied @@ -105,8 +149,8 @@ class RapidsDeviceMemoryStore * * @param needsSync true if we should stream synchronize before adding the buffer */ - private def addDeviceBuffer( - buffer: RapidsDeviceMemoryBuffer, + private def addBuffer( + buffer: RapidsBufferBase, needsSync: Boolean): Unit = { if (needsSync) { Cuda.DEFAULT_STREAM.sync() @@ -122,15 +166,249 @@ class RapidsDeviceMemoryStore doSetSpillable(buffer, spillable) } + /** + * A per cuDF column event handler that handles calls to .close() + * inside of the `ColumnVector` lock. + */ + class RapidsDeviceColumnEventHandler + extends ColumnVector.EventHandler { + + // Every RapidsTable that references this column has an entry in this map. + // The value represents the number of times (normally 1) that a ColumnVector + // appears in the RapidsTable. This is also the ColumnVector refCount at which + // the column is considered spillable. + // The map is protected via the ColumnVector lock. + private val registration = new mutable.HashMap[RapidsTable, Int]() + + /** + * Every RapidsTable iterates through its columns and either creates + * a `ColumnTracking` object and associates it with the column's + * `eventHandler` or calls into the existing one, and registers itself. + * + * The registration has two goals: it accounts for repetition of a column + * in a `RapidsTable`. If a table has the same column repeated it must adjust + * the refCount at which this column is considered spillable. + * + * The second goal is to account for aliasing. If two tables alias this column + * we are going to mark it as non spillable. + * + * @param rapidsTable - the table that is registering itself with this tracker + */ + def register(rapidsTable: RapidsTable, repetition: Int): Unit = { + registration.put(rapidsTable, repetition) + } + + /** + * This is invoked during `RapidsTable.free` in order to remove the entry + * in `registration`. + * @param rapidsTable - the table that is de-registering itself + */ + def deregister(rapidsTable: RapidsTable): Unit = { + registration.remove(rapidsTable) + } + + // called with the cudfCv lock held from cuDF's side + override def onClosed(cudfCv: ColumnVector, refCount: Int): Unit = { + // we only handle spillability if there is a single table registered + // (no aliasing) + if (registration.size == 1) { + val (rapidsTable, spillableRefCount) = registration.head + if (spillableRefCount == refCount) { + rapidsTable.onColumnSpillable(cudfCv) + } + } + } + } + + /** + * A `RapidsTable` is the spill store holder of a cuDF `Table`. + * + * The table is not contiguous in GPU memory. Instead, this `RapidsBuffer` instance + * allows us to use the cuDF chunked_pack API to make the table contiguous as the spill + * is happening. + * + * This class owns the cuDF table and will close it when `close` is called. + * + * @param id the `RapidsBufferId` this table is associated with + * @param table the cuDF table that we are managing + * @param spillPriority a starting spill priority + */ + class RapidsTable( + id: TempSpillBufferId, + table: Table, + spillPriority: Long) + extends RapidsBufferBase( + id, + null, + spillPriority) { + + /** The storage tier for this buffer */ + override val storageTier: StorageTier = StorageTier.DEVICE + + override val supportsChunkedPacker: Boolean = true + + private var initializedChunkedPacker: Boolean = false + + lazy val chunkedPacker: ChunkedPacker = { + val packer = new ChunkedPacker(id, table, chunkedPackBounceBuffer) + initializedChunkedPacker = true + packer + } + + // This is the current size in batch form. It is to be used while this + // table hasn't migrated to another store. + private val unpackedSizeInBytes: Long = GpuColumnVector.getTotalDeviceMemoryUsed(table) + + // By default all columns are NOT spillable since we are not the only owners of + // the columns (the caller is holding onto a ColumnarBatch that will be closed + // after instantiation, triggering onClosed callbacks) + // This hash set contains the columns that are currently spillable. + private val columnSpillability = new ConcurrentHashMap[ColumnVector, Boolean]() + + private val numDistinctColumns = + (0 until table.getNumberOfColumns).map(table.getColumn).distinct.size + + // we register our event callbacks as the very first action to deal with + // spillability + registerOnCloseEventHandler() + + /** Release the underlying resources for this buffer. */ + override protected def releaseResources(): Unit = { + table.close() + } + + override def meta: TableMeta = { + chunkedPacker.getMeta + } + + override def getMemoryUsedBytes: Long = unpackedSizeInBytes + + override def getPackedSizeBytes: Long = getChunkedPacker.getTotalContiguousSize + + override def getChunkedPacker: ChunkedPacker = { + chunkedPacker + } + + /** + * Mark a column as spillable + * + * @param column the ColumnVector to mark as spillable + */ + def onColumnSpillable(column: ColumnVector): Unit = { + columnSpillability.put(column, true) + updateSpillability() + } + + /** + * Update the spillability state of this RapidsTable. This is invoked from + * two places: + * + * - from the onColumnSpillable callback, which is invoked from a + * ColumnVector.EventHandler.onClosed callback. + * + * - after adding a table to the store to mark the table as spillable if + * all columns are spillable. + */ + def updateSpillability(): Unit = { + doSetSpillable(this, columnSpillability.size == numDistinctColumns) + } + + /** + * Produce a `ColumnarBatch` from our table, and in the process make ourselves + * not spillable. + * + * @param sparkTypes the spark data types the batch should have + */ + override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { + columnSpillability.clear() + doSetSpillable(this, false) + GpuColumnVector.from(table, sparkTypes) + } + + /** + * Get the underlying memory buffer. This may be either a HostMemoryBuffer or a + * DeviceMemoryBuffer depending on where the buffer currently resides. + * The caller must have successfully acquired the buffer beforehand. + * + * @see [[addReference]] + * @note It is the responsibility of the caller to close the buffer. + */ + override def getMemoryBuffer: MemoryBuffer = { + throw new UnsupportedOperationException( + "RapidsDeviceMemoryBatch doesn't support getMemoryBuffer") + } + + override def free(): Unit = { + // lets remove our handler from the chain of handlers for each column + removeOnCloseEventHandler() + super.free() + if (initializedChunkedPacker) { + chunkedPacker.close() + initializedChunkedPacker = false + } + } + + private def registerOnCloseEventHandler(): Unit = { + val columns = (0 until table.getNumberOfColumns).map(table.getColumn) + // cudfColumns could contain duplicates. We need to take this into account when we are + // deciding the floor refCount for a duplicated column + val repetitionPerColumn = new mutable.HashMap[ColumnVector, Int]() + columns.foreach { col => + val repetitionCount = repetitionPerColumn.getOrElse(col, 0) + repetitionPerColumn(col) = repetitionCount + 1 + } + repetitionPerColumn.foreach { case (distinctCv, repetition) => + // lock the column because we are setting its event handler, and we are inspecting + // its refCount. + distinctCv.synchronized { + val eventHandler = distinctCv.getEventHandler match { + case null => + val eventHandler = new RapidsDeviceColumnEventHandler + distinctCv.setEventHandler(eventHandler) + eventHandler + case existing: RapidsDeviceColumnEventHandler => + existing + case other => + throw new IllegalStateException( + s"Invalid column event handler $other") + } + eventHandler.register(this, repetition) + if (repetition == distinctCv.getRefCount) { + onColumnSpillable(distinctCv) + } + } + } + } + + // this method is called from free() + private def removeOnCloseEventHandler(): Unit = { + val distinctColumns = + (0 until table.getNumberOfColumns).map(table.getColumn).distinct + distinctColumns.foreach { distinctCv => + distinctCv.synchronized { + distinctCv.getEventHandler match { + case eventHandler: RapidsDeviceColumnEventHandler => + eventHandler.deregister(this) + case t => + throw new IllegalStateException( + s"Invalid column event handler $t") + } + } + } + } + } + class RapidsDeviceMemoryBuffer( id: RapidsBufferId, size: Long, meta: TableMeta, contigBuffer: DeviceMemoryBuffer, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) + extends RapidsBufferBase(id, meta, spillPriority) with MemoryBuffer.EventHandler { + override def getMemoryUsedBytes(): Long = size + override val storageTier: StorageTier = StorageTier.DEVICE // If this require triggers, we are re-adding a `DeviceMemoryBuffer` outside of @@ -204,4 +482,9 @@ class RapidsDeviceMemoryStore super.free() } } + override def close(): Unit = { + super.close() + chunkedPackBounceBuffer.close() + chunkedPackBounceBuffer = null + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index 10d9da5652a..1fcb9b94119 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -34,8 +34,12 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) override protected def createBuffer( incoming: RapidsBuffer, - incomingBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { + // assuming that the disk store gets contiguous buffers + val incomingBuffer = + withResource(incoming.getCopyIterator) { incomingCopyIterator => + incomingCopyIterator.next() + } withResource(incomingBuffer) { _ => val hostBuffer = incomingBuffer match { case h: HostMemoryBuffer => h @@ -55,11 +59,11 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) } else { copyBufferToPath(hostBuffer, path, append = false) } - logDebug(s"Spilled to $path $fileOffset:${incoming.size}") + logDebug(s"Spilled to $path $fileOffset:${incomingBuffer.getLength}") new RapidsDiskBuffer( id, fileOffset, - incoming.size, + incomingBuffer.getLength, incoming.meta, incoming.getSpillPriority) } @@ -94,9 +98,11 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) meta: TableMeta, spillPriority: Long) extends RapidsBufferBase( - id, size, meta, spillPriority) { + id, meta, spillPriority) { private[this] var hostBuffer: Option[HostMemoryBuffer] = None + override def getMemoryUsedBytes(): Long = size + override val storageTier: StorageTier = StorageTier.DISK override def getMemoryBuffer: MemoryBuffer = synchronized { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 4f94939259f..70160f5b9d0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -37,8 +37,14 @@ class RapidsGdsStore( extends RapidsBufferStore(StorageTier.GDS) { private[this] val batchSpiller = new BatchSpiller() - override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer, + override protected def createBuffer( + other: RapidsBuffer, stream: Cuda.Stream): RapidsBufferBase = { + // assume that we get 1 buffer + val otherBuffer = withResource(other.getCopyIterator) { it => + it.next() + } + withResource(otherBuffer) { _ => val deviceBuffer = otherBuffer match { case d: BaseDeviceMemoryBuffer => d @@ -59,12 +65,14 @@ class RapidsGdsStore( abstract class RapidsGdsBuffer( override val id: RapidsBufferId, - override val size: Long, + val size: Long, override val meta: TableMeta, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) { + extends RapidsBufferBase(id, meta, spillPriority) { override val storageTier: StorageTier = StorageTier.GDS + override def getMemoryUsedBytes(): Long = size + override def getMemoryBuffer: MemoryBuffer = getDeviceMemoryBuffer } @@ -122,12 +130,12 @@ class RapidsGdsStore( CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer) 0 } - logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS") + logDebug(s"Spilled to $path $fileOffset:${deviceBuffer.getLength} via GDS") new RapidsGdsSingleShotBuffer( id, path, fileOffset, - other.size, + deviceBuffer.getLength, other.meta, other.getSpillPriority) } @@ -169,7 +177,7 @@ class RapidsGdsStore( id, currentFile, currentOffset, - other.size, + deviceBuffer.getLength, other.meta, other.getSpillPriority) currentOffset += alignUp(deviceBuffer.getLength) @@ -223,6 +231,8 @@ class RapidsGdsStore( var isPending: Boolean = true) extends RapidsGdsBuffer(id, size, meta, spillPriority) { + override def getMemoryUsedBytes(): Long = size + override def materializeMemoryBuffer: MemoryBuffer = this.synchronized { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => if (isPending) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 360cb7fcf78..2a46b3f58b2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -16,8 +16,8 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool} -import com.nvidia.spark.rapids.Arm.withResource +import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange, PinnedMemoryPool} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_DIRECT_OFFSET, HOST_MEMORY_BUFFER_PAGEABLE_OFFSET, HOST_MEMORY_BUFFER_PINNED_OFFSET} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -42,10 +42,15 @@ class RapidsHostMemoryStore( override def getMaxSize: Option[Long] = Some(maxSize) - private def allocateHostBuffer(size: Long): (HostMemoryBuffer, AllocationMode) = { - var buffer: HostMemoryBuffer = PinnedMemoryPool.tryAllocate(size) - if (buffer != null) { - return (buffer, Pinned) + private def allocateHostBuffer( + size: Long, + preferPinned: Boolean = true): (HostMemoryBuffer, AllocationMode) = { + var buffer: HostMemoryBuffer = null + if (preferPinned) { + buffer = PinnedMemoryPool.tryAllocate(size) + if (buffer != null) { + return (buffer, Pinned) + } } val allocation = addressAllocator.allocate(size) @@ -62,29 +67,45 @@ class RapidsHostMemoryStore( (HostMemoryBuffer.allocate(size, false), Direct) } - override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer, + override protected def createBuffer( + other: RapidsBuffer, stream: Cuda.Stream): RapidsBufferBase = { - withResource(otherBuffer) { _ => - val (hostBuffer, allocationMode) = allocateHostBuffer(other.size) - try { - otherBuffer match { - case devBuffer: DeviceMemoryBuffer => - hostBuffer.copyFromDeviceBuffer(devBuffer, stream) - case _ => - throw new IllegalStateException("copying from buffer without device memory") + withResource(other.getCopyIterator) { otherBufferIterator => + val isChunked = otherBufferIterator.isChunked + val totalCopySize = otherBufferIterator.getTotalCopySize + val (hostBuffer, allocationMode) = allocateHostBuffer(totalCopySize) + closeOnExcept(hostBuffer) { _ => + withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => + var hostOffset = 0L + val start = System.nanoTime() + while (otherBufferIterator.hasNext) { + val otherBuffer = otherBufferIterator.next() + withResource(otherBuffer) { _ => + otherBuffer match { + case devBuffer: DeviceMemoryBuffer => + hostBuffer.copyFromMemoryBufferAsync( + hostOffset, devBuffer, 0, otherBuffer.getLength, stream) + hostOffset += otherBuffer.getLength + case _ => + throw new IllegalStateException("copying from buffer without device memory") + } + } + } + stream.sync() + val end = System.nanoTime() + val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong + val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong + logDebug(s"Spill to host (mode=$allocationMode, chunked=$isChunked) " + + s"size=$szMB MiB bandwidth=$bw MiB/sec") } - } catch { - case e: Exception => - hostBuffer.close() - throw e + new RapidsHostMemoryBuffer( + other.id, + totalCopySize, + other.meta, + applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), + hostBuffer, + allocationMode) } - new RapidsHostMemoryBuffer( - other.id, - other.size, - other.meta, - applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), - hostBuffer, - allocationMode) } } @@ -103,7 +124,7 @@ class RapidsHostMemoryStore( buffer: HostMemoryBuffer, allocationMode: AllocationMode) extends RapidsBufferBase( - id, size, meta, spillPriority) { + id, meta, spillPriority) { override val storageTier: StorageTier = StorageTier.HOST override def getMemoryBuffer: MemoryBuffer = { @@ -121,5 +142,8 @@ class RapidsHostMemoryStore( } buffer.close() } + + /** The size of this buffer in bytes. */ + override def getMemoryUsedBytes: Long = size } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 689645fae30..4a3835fba1c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -97,7 +97,7 @@ class SpillableColumnarBatchImpl ( } override lazy val sizeInBytes: Long = - withRapidsBuffer(_.size) + withRapidsBuffer(_.getMemoryUsedBytes) /** * Set a new spill priority. @@ -206,14 +206,7 @@ object SpillableColumnarBatch { val buff = cv.getBuffer RapidsBufferCatalog.addBuffer(buff, cv.getTableMeta, initialSpillPriority) } else { - withResource(GpuColumnVector.from(batch)) { tmpTable => - withResource(tmpTable.contiguousSplit()) { contigTables => - require(contigTables.length == 1, "Unexpected number of contiguous spit tables") - RapidsBufferCatalog.addContiguousTable( - contigTables.head, - initialSpillPriority) - } - } + RapidsBufferCatalog.addBatch(batch, initialSpillPriority) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala index 198ad9dbb86..c1d65f16980 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala @@ -83,7 +83,7 @@ class BufferSendState( withResource(requestHandler.acquireShuffleBuffer( bufferTransferRequest.bufferId())) { table => bufferMetas(ix) = table.meta.bufferMeta() - new SendBlock(bufferTransferRequest.bufferId(), table.size) + new SendBlock(bufferTransferRequest.bufferId(), table.getPackedSizeBytes) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala index a1cda5bdf57..98012d9d494 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala @@ -360,7 +360,7 @@ class RapidsShuffleIterator( try { sb = catalog.acquireBuffer(handle) cb = sb.getColumnarBatch(sparkTypes) - metricsUpdater.update(blockedTime, 1, sb.size, cb.numRows()) + metricsUpdater.update(blockedTime, 1, sb.getMemoryUsedBytes, cb.numRows()) } finally { nvtxRangeAfterGettingBatch.close() range.close() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 605e4bd3365..c8de4a2d4fb 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -25,16 +25,23 @@ import java.util.TimeZone import scala.collection.JavaConverters._ import scala.util.{Failure, Random, Success, Try} +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, NamedExpression} import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types._ -class CastOpSuite extends GpuExpressionTestSuite { +class CastOpSuite extends GpuExpressionTestSuite with BeforeAndAfterAll { import CastOpSuite._ + override def afterAll(): Unit = { + TrampolineUtil.cleanupAnyExistingSession() + } + private val sparkConf = new SparkConf() .set(RapidsConf.ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES.key, "true") .set(RapidsConf.ENABLE_CAST_STRING_TO_FLOAT.key, "true") diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala index 88e0ab857ae..5950617ea48 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala @@ -212,40 +212,48 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { } test("multiple calls to unspill return existing DEVICE buffer") { - val deviceStore = spy(new RapidsDeviceMemoryStore) - val mockStore = mock[RapidsBufferStore] - withResource( - new RapidsHostMemoryStore(10000, 1000)) { hostStore => - deviceStore.setSpillStore(hostStore) - hostStore.setSpillStore(mockStore) - val catalog = new RapidsBufferCatalog(deviceStore) - val handle = withResource(DeviceMemoryBuffer.allocate(1024)) { buff => - val meta = MetaUtils.getTableMetaNoTable(buff) - catalog.addBuffer( - buff, meta, -1) - } - withResource(handle) { _ => - catalog.synchronousSpill(deviceStore, 0) - val acquiredHostBuffer = catalog.acquireBuffer(handle) - withResource(acquiredHostBuffer) { _ => - assertResult(HOST)(acquiredHostBuffer.storageTier) - val unspilled = - catalog.unspillBufferToDeviceStore( + withResource(spy(new RapidsDeviceMemoryStore)) { deviceStore => + val mockStore = mock[RapidsBufferStore] + withResource( + new RapidsHostMemoryStore(10000, 1000)) { hostStore => + deviceStore.setSpillStore(hostStore) + hostStore.setSpillStore(mockStore) + val catalog = new RapidsBufferCatalog(deviceStore) + val handle = withResource(DeviceMemoryBuffer.allocate(1024)) { buff => + val meta = MetaUtils.getTableMetaNoTable(buff) + catalog.addBuffer( + buff, meta, -1) + } + withResource(handle) { _ => + catalog.synchronousSpill(deviceStore, 0) + val acquiredHostBuffer = catalog.acquireBuffer(handle) + val unspilled = withResource(acquiredHostBuffer) { _ => + assertResult(HOST)(acquiredHostBuffer.storageTier) + val unspilled = + catalog.unspillBufferToDeviceStore( + acquiredHostBuffer, + Cuda.DEFAULT_STREAM) + withResource(unspilled) { _ => + assertResult(DEVICE)(unspilled.storageTier) + } + val unspilledSame = catalog.unspillBufferToDeviceStore( acquiredHostBuffer, - acquiredHostBuffer.getMemoryBuffer, Cuda.DEFAULT_STREAM) - withResource(unspilled) { _ => - assertResult(DEVICE)(unspilled.storageTier) + withResource(unspilledSame) { _ => + assertResult(unspilled)(unspilledSame) + } + // verify that we invoked the copy function exactly once + verify(deviceStore, times(1)).copyBuffer(any(), any()) + unspilled } val unspilledSame = catalog.unspillBufferToDeviceStore( acquiredHostBuffer, - acquiredHostBuffer.getMemoryBuffer, Cuda.DEFAULT_STREAM) withResource(unspilledSame) { _ => assertResult(unspilled)(unspilledSame) } // verify that we invoked the copy function exactly once - verify(deviceStore, times(1)).copyBuffer(any(), any(), any()) + verify(deviceStore, times(1)).copyBuffer(any(), any()) } } } @@ -322,8 +330,8 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { var _acquireAttempts: Int = acquireAttempts var currentPriority: Long = initialPriority override val id: RapidsBufferId = bufferId - override val size: Long = 0 - override val meta: TableMeta = tableMeta + override def getMemoryUsedBytes: Long = 0 + override def meta: TableMeta = tableMeta override val storageTier: StorageTier = tier override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = null override def getMemoryBuffer: MemoryBuffer = null diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index e73a68b2198..3b3b3820b07 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -21,7 +21,7 @@ import java.math.RoundingMode import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} +import ai.rapids.cudf.{ColumnVector, ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -34,13 +34,28 @@ import org.apache.spark.sql.rapids.RapidsDiskBlockManager import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, StringType} class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { - private def buildContiguousTable(): ContiguousTable = { - withResource(new Table.TestBuilder() + private def buildTable(): Table = { + new Table.TestBuilder() .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) - .column(5.0, 2.0, 3.0, 1.0) + .column(5.0D, 2.0D, 3.0D, 1.0D) .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) - .build()) { table => + .build() + } + + private def buildTableWithDuplicate(): Table = { + withResource(ColumnVector.fromInts(5, null.asInstanceOf[java.lang.Integer], 3, 1)) { intCol => + withResource(ColumnVector.fromStrings("five", "two", null, null)) { stringCol => + withResource(ColumnVector.fromDoubles(5.0, 2.0, 3.0, 1.0)) { doubleCol => + // add intCol twice + new Table(intCol, intCol, stringCol, doubleCol) + } + } + } + } + + private def buildContiguousTable(): ContiguousTable = { + withResource(buildTable()) { table => table.contiguousSplit()(0) } } @@ -62,7 +77,153 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { } } - test("a table is not spillable until the owner closes it") { + test("a non-contiguous table is spillable and it is handed over to the store") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + catalog.addTable(table, spillPriority) + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("a non-contiguous table becomes non-spillable when batch is obtained") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + val batch = withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + rapidsBuffer.getColumnarBatch(types) + } + withResource(batch) { _ => + assertResult(buffSize)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + } + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("a non-contiguous table is non-spillable until all columns are returned") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + // incRefCount all the columns via `batch` + val batch = withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + rapidsBuffer.getColumnarBatch(types) + } + val columns = GpuColumnVector.extractBases(batch) + withResource(columns.head) { _ => + columns.head.incRefCount() + withResource(batch) { _ => + assertResult(buffSize)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + } + // still 0 after the batch is closed, because of the extra incRefCount + // for columns.head + assertResult(0)(store.currentSpillableSize) + } + // columns.head is closed, so now our RapidsTable is spillable again + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an aliased non-contiguous table is not spillable (until closing the alias) ") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + val aliasHandle = withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + // extract the batch from the table we added, and add it back as a batch + withResource(rapidsBuffer.getColumnarBatch(types)) { batch => + catalog.addBatch(batch, spillPriority) + } + } // we now have two copies in the store + assertResult(buffSize*2)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + + aliasHandle.close() // remove the alias + + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an aliased non-contiguous table is not spillable (until closing the original) ") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + // extract the batch from the table we added, and add it back as a batch + withResource(rapidsBuffer.getColumnarBatch(types)) { batch => + catalog.addBatch(batch, spillPriority) + } + } // we now have two copies in the store + assertResult(buffSize * 2)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + + handle.close() // remove the original + + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an non-contiguous table supports duplicated columns") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTableWithDuplicate() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, IntegerType, StringType, DoubleType).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + // extract the batch from the table we added, and add it back as a batch + withResource(rapidsBuffer.getColumnarBatch(types)) { batch => + catalog.addBatch(batch, spillPriority) + } + } // we now have two copies in the store + assertResult(buffSize * 2)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + + handle.close() // remove the original + + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("a contiguous table is not spillable until the owner closes it") { withResource(new RapidsDeviceMemoryStore) { store => val catalog = spy(new RapidsBufferCatalog(store)) val spillPriority = 3 @@ -226,13 +387,6 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { } } - test("cannot receive spilled buffers") { - withResource(new RapidsDeviceMemoryStore) { store => - assertThrows[IllegalStateException](store.copyBuffer( - mock[RapidsBuffer], mock[MemoryBuffer], Cuda.DEFAULT_STREAM)) - } - } - test("size statistics") { withResource(new RapidsDeviceMemoryStore) { store => @@ -312,22 +466,22 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { override protected def createBuffer( b: RapidsBuffer, - m: MemoryBuffer, s: Cuda.Stream): RapidsBufferBase = { - withResource(m) { _ => - spilledBuffers += b.id - new MockRapidsBuffer(b.id, b.size, b.meta, b.getSpillPriority) - } + spilledBuffers += b.id + new MockRapidsBuffer(b.id, b.getPackedSizeBytes, b.meta, b.getSpillPriority) } class MockRapidsBuffer(id: RapidsBufferId, size: Long, meta: TableMeta, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) { + extends RapidsBufferBase(id, meta, spillPriority) { override protected def releaseResources(): Unit = {} override val storageTier: StorageTier = StorageTier.HOST override def getMemoryBuffer: MemoryBuffer = throw new UnsupportedOperationException + + /** The size of this buffer in bytes. */ + override def getMemoryUsedBytes: Long = size } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index 7df34c90cfd..64ee8b7f42b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -68,7 +68,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { ArgumentMatchers.eq(handle.id), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.DISK)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) + assertResult(bufferSize)(buffer.getMemoryUsedBytes) assertResult(handle.id)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala index d1be2ae104a..176647e6a8f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala @@ -92,7 +92,7 @@ class RapidsGdsStoreSuite extends FunSuiteWithTempDir with MockitoSugar { withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.GDS)(buffer.storageTier) assertResult(id)(buffer.id) - assertResult(size)(buffer.size) + assertResult(size)(buffer.getMemoryUsedBytes) assertResult(spillPriority)(buffer.getSpillPriority) } } @@ -126,7 +126,7 @@ class RapidsGdsStoreSuite extends FunSuiteWithTempDir with MockitoSugar { ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.GDS)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) + assertResult(bufferSize)(buffer.getMemoryUsedBytes) assertResult(bufferId)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index 0e004423272..068f17a9023 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import java.io.File import java.math.RoundingMode -import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, HostMemoryBuffer, MemoryBuffer, Table} +import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, HostMemoryBuffer, Table} import com.nvidia.spark.rapids.Arm._ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.ArgumentMatchers.any @@ -85,7 +85,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { ArgumentMatchers.eq(handle.id), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.HOST)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) + assertResult(bufferSize)(buffer.getMemoryUsedBytes) assertResult(handle.id)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } @@ -175,7 +175,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { override def getDiskPath(diskBlockManager: RapidsDiskBlockManager): File = null }) when(mockStore.getMaxSize).thenAnswer(_ => None) - when(mockStore.copyBuffer(any(), any(), any())).thenReturn(mockBuff) + when(mockStore.copyBuffer(any(), any())).thenReturn(mockBuff) when(mockStore.tier) thenReturn (StorageTier.DISK) withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize)) { hostStore => devStore.setSpillStore(hostStore) @@ -200,7 +200,6 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { bigTable = null catalog.synchronousSpill(devStore, 0) verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer], - ArgumentMatchers.any[MemoryBuffer], ArgumentMatchers.any[Cuda.Stream]) withResource(catalog.acquireBuffer(bigHandle)) { buffer => assertResult(StorageTier.HOST)(buffer.storageTier) @@ -218,13 +217,9 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { catalog.synchronousSpill(devStore, 0) val rapidsBufferCaptor: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer]) - val memoryBufferCaptor: ArgumentCaptor[MemoryBuffer] = - ArgumentCaptor.forClass(classOf[MemoryBuffer]) verify(mockStore).copyBuffer(rapidsBufferCaptor.capture(), - memoryBufferCaptor.capture(), ArgumentMatchers.any[Cuda.Stream]) - withResource(memoryBufferCaptor.getValue) { _ => - assertResult(bigHandle.id)(rapidsBufferCaptor.getValue.id) - } + ArgumentMatchers.any[Cuda.Stream]) + assertResult(bigHandle.id)(rapidsBufferCaptor.getValue.id) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala index f1de989b605..296f0cf2ce2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala @@ -34,6 +34,7 @@ class RmmSparkRetrySuiteBase extends FunSuite with BeforeAndAfterEach { } val deviceStorage = new RapidsDeviceMemoryStore() val catalog = new RapidsBufferCatalog(deviceStorage) + RapidsBufferCatalog.setDeviceStorage(deviceStorage) RapidsBufferCatalog.setCatalog(catalog) val mockEventHandler = new BaseRmmEventHandler() RmmSpark.setEventHandler(mockEventHandler) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala index cc55d9c6b0f..6f2d216c1eb 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala @@ -53,6 +53,7 @@ class WithRetrySuite } val deviceStorage = new RapidsDeviceMemoryStore() val catalog = new RapidsBufferCatalog(deviceStorage) + RapidsBufferCatalog.setDeviceStorage(deviceStorage) RapidsBufferCatalog.setCatalog(catalog) val mockEventHandler = new BaseRmmEventHandler() RmmSpark.setEventHandler(mockEventHandler) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala index a8e6c789906..afc17081afb 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala @@ -227,7 +227,7 @@ class RapidsShuffleIteratorSuite extends RapidsShuffleTestHelper { assert(cl.hasNext) assertResult(cb)(cl.next()) assertResult(1)(testMetricsUpdater.totalRemoteBlocksFetched) - assertResult(mockBuffer.size)(testMetricsUpdater.totalRemoteBytesRead) + assertResult(mockBuffer.getMemoryUsedBytes)(testMetricsUpdater.totalRemoteBytesRead) assertResult(10)(testMetricsUpdater.totalRowsFetched) } finally { RmmSpark.taskDone(taskId) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala index bb958280c6a..3eb73ef0f13 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala @@ -54,7 +54,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val stream = invocation.getArgument[Cuda.Stream](4) dst.copyFromMemoryBuffer(dstOffset, deviceBuffer, srcOffset, length, stream) } - when(mockBuffer.size).thenReturn(deviceBuffer.getLength) + when(mockBuffer.getPackedSizeBytes).thenReturn(deviceBuffer.getLength) when(mockBuffer.meta).thenReturn(mockMeta) mockBuffer } @@ -230,7 +230,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.size).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getPackedSizeBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -287,7 +287,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.size).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getPackedSizeBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -371,7 +371,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val rapidsBuffer = mock[RapidsBuffer] val tableMeta = MetaUtils.buildTableMeta(tableId, 456, bb, 100) when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.size).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getPackedSizeBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(tableId))) .thenReturn(rapidsBuffer) rapidsBuffer diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index 0ebd3b54d49..1015192f8ed 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -47,8 +47,8 @@ class SpillableColumnarBatchSuite extends FunSuite { } class MockBuffer(override val id: RapidsBufferId) extends RapidsBuffer { - override val size: Long = 123 - override val meta: TableMeta = null + override def getMemoryUsedBytes: Long = 123 + override def meta: TableMeta = null override val storageTier: StorageTier = StorageTier.DEVICE override def getMemoryBuffer: MemoryBuffer = null override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long,