From e10b0c7dbc9b544f6e5b5a679034af3d1535487f Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 7 Mar 2023 17:46:36 -0600 Subject: [PATCH 01/10] Make tables spillable by default Signed-off-by: Alessandro Bellina --- docs/configs.md | 3 + .../spark/rapids/GpuDeviceManager.scala | 15 +- .../nvidia/spark/rapids/RapidsBuffer.scala | 175 ++++++++++- .../spark/rapids/RapidsBufferCatalog.scala | 78 +++-- .../spark/rapids/RapidsBufferStore.scala | 48 ++- .../com/nvidia/spark/rapids/RapidsConf.scala | 18 ++ .../rapids/RapidsDeviceMemoryStore.scala | 280 ++++++++++++++++-- .../nvidia/spark/rapids/RapidsDiskStore.scala | 16 +- .../nvidia/spark/rapids/RapidsGdsStore.scala | 26 +- .../spark/rapids/RapidsHostMemoryStore.scala | 70 +++-- .../spark/rapids/SpillableColumnarBatch.scala | 11 +- .../rapids/shuffle/BufferSendState.scala | 4 +- .../shuffle/RapidsShuffleIterator.scala | 2 +- .../spark/sql/rapids/stringFunctions.scala | 2 +- .../com/nvidia/spark/rapids/CastOpSuite.scala | 9 +- .../rapids/GpuCoalesceBatchesRetrySuite.scala | 2 +- .../rapids/RapidsBufferCatalogSuite.scala | 19 +- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 21 +- .../spark/rapids/RapidsDiskStoreSuite.scala | 2 +- .../spark/rapids/RapidsGdsStoreSuite.scala | 4 +- .../rapids/RapidsHostMemoryStoreSuite.scala | 15 +- .../spark/rapids/RmmSparkRetrySuiteBase.scala | 1 + .../nvidia/spark/rapids/WithRetrySuite.scala | 1 + .../shuffle/RapidsShuffleIteratorSuite.scala | 2 +- .../shuffle/RapidsShuffleServerSuite.scala | 16 +- .../rapids/SpillableColumnarBatchSuite.scala | 4 +- 26 files changed, 677 insertions(+), 167 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 554a98f976e..5362e593166 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -42,6 +42,7 @@ Name | Description | Default Value | Applicable at spark.rapids.alluxio.slow.disk|Indicates whether the disks used by Alluxio are slow. If it's true and reading S3 large files, Rapids Accelerator reads from S3 directly instead of reading from Alluxio caches. Refer to spark.rapids.alluxio.large.file.threshold which defines a threshold that identifying whether files are large. Typically, it's slow disks if speed is less than 300M/second. If using convert time spark.rapids.alluxio.replacement.algo, this may not apply to all file types like Delta files|true|Runtime spark.rapids.alluxio.user|Alluxio user is set on the Alluxio client, which is used to mount or get information. By default it should be the user that running the Alluxio processes. The default value is ubuntu.|ubuntu|Runtime spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: abfs, abfss, dbfs, gs, s3, s3a, s3n, wasbs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None|Runtime +spark.rapids.filecache.checkStale|Controls whether the cached is checked for being out of date with respect to the input file. When enabled, the data that has been cached locally for a file will be invalidated if the file is updated after being cached. This feature is only necessary if an input file for a Spark application can be changed during the lifetime of the application. If an individual input file will not be overwritten during the Spark application then performance may be improved by setting this to false.|false|Startup spark.rapids.filecache.enabled|Controls whether the caching of input files is enabled. When enabled, input datais cached to the same local directories configured for the Spark application. The cache will use up to half the available space by default. To set an absolute cache size limit, see the spark.rapids.filecache.maxBytes configuration setting. Currently only data from Parquet files are cached.|false|Startup spark.rapids.filecache.maxBytes|Controls the maximum amount of data that will be cached locally. If left unspecified, it will use half of the available disk space detected on startup for the configured Spark local disks.|None|Startup spark.rapids.gpu.resourceName|The name of the Spark resource that represents a GPU that you want the plugin to use if using custom resources with Spark.|gpu|Startup @@ -84,6 +85,8 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|true|Runtime spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|true|Runtime spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false|Runtime +spark.rapids.sql.chunkedPack.bounceBufferSize|Amount of GPU memory (in bytes) to set aside at startup for the chunked pack bounce buffer, needed during spill from GPU to host memory. |134217728|Runtime +spark.rapids.sql.chunkedPack.poolSize|Amount of GPU memory (in bytes) to set aside at startup for the chunked pack scratch space, needed during spill from GPU to host memory.|10485760|Runtime spark.rapids.sql.coalescing.reader.numFilterParallel|This controls the number of files the coalescing reader will run in each thread when it filters blocks for reading. If this value is greater than zero the files will be filtered in a multithreaded manner where each thread filters the number of files set by this config. If this is set to zero the files are filtered serially. This uses the same thread pool as the multithreaded reader, see spark.rapids.sql.multiThreadedRead.numThreads. Note that filtering multithreaded is useful with Alluxio.|0|Runtime spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|2|Runtime spark.rapids.sql.concurrentWriterPartitionFlushSize|The flush size of the concurrent writer cache in bytes for each partition. If specified spark.sql.maxConcurrentOutputFileWriters, use concurrent writer to write data. Concurrent writer first caches data for each partition and begins to flush the data if it finds one partition with a size that is greater than or equal to this config. The default value is 0, which will try to select a size based off of file type specific configs. E.g.: It uses `write.parquet.row-group-size-bytes` config for Parquet type and `orc.stripe.size` config for Orc type. If the value is greater than 0, will use this positive value.Max value may get better performance but not always, because concurrent writer uses spillable cache and big value may cause more IO swaps.|0|Runtime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index b03ffe4a067..25784678478 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -42,6 +42,11 @@ object GpuDeviceManager extends Logging { java.lang.Boolean.getBoolean("com.nvidia.spark.rapids.memory.gpu.rmm.init.task") } + // Memory resource used only for cudf::chunked_pack to allocate scratch space + // during spill to host. This is done to set aside some memory for this operation + // from the beginning of the job. + var chunkedPackMemoryResource: RmmPoolMemoryResource[RmmCudaMemoryResource] = null + // for testing only def setRmmTaskInitEnabled(enabled: Boolean): Unit = { rmmTaskInitEnabled = enabled @@ -140,6 +145,7 @@ object GpuDeviceManager extends Logging { def shutdown(): Unit = synchronized { // assume error during shutdown until we complete it + chunkedPackMemoryResource.close() singletonMemoryInitialized = Errored RapidsBufferCatalog.close() GpuShuffleEnv.shutdown() @@ -246,8 +252,15 @@ object GpuDeviceManager extends Logging { private def initializeRmm(gpuId: Int, rapidsConf: Option[RapidsConf] = None): Unit = { if (!Rmm.isInitialized) { val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf)) - val info = Cuda.memGetInfo() + val poolSize = conf.chunkedPackPoolSize + chunkedPackMemoryResource = + new RmmPoolMemoryResource(new RmmCudaMemoryResource(), poolSize, poolSize) + logInfo( + s"Initialized pool resource for spill operations " + + s"of ${chunkedPackMemoryResource.getMaxSize} Bytes") + + val info = Cuda.memGetInfo() val poolAllocation = computeRmmPoolSize(conf, info) var init = RmmAllocationMode.CUDA_DEFAULT val features = ArrayBuffer[String]() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 61394b47850..f46455c0479 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -18,11 +18,15 @@ package com.nvidia.spark.rapids import java.io.File +import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.RapidsDiskBlockManager import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -59,16 +63,166 @@ object StorageTier extends Enumeration { val GDS: StorageTier = Value(3, "GPUDirect Storage") } +/** + * ChunkedPacker is an Iterator that uses a cudf::chunked_pack to copy a cuDF `Table` + * to a target buffer in chunks. + * + * Each chunk is sized at most `bounceBuffer.getLength`, and the caller should cudaMemcpy + * bytes from `bounceBuffer` to a target buffer after each call to `next()`. + * + * @note `ChunkedPacker` must be closed by the caller as it has GPU and host resources + * associated with it. + * + * @param id The RapidsBufferId for this pack operation to be included in the metadata + * @param table cuDF Table to chunk_pack + * @param bounceBuffer GPU memory to be used for packing. The buffer should be at least 1MB + * in length. + */ +class ChunkedPacker( + id: RapidsBufferId, + table: Table, + bounceBuffer: DeviceMemoryBuffer) + extends Iterator[MemoryBuffer] + with Logging + with AutoCloseable { + + private var closed: Boolean = false + + private val chunkedPack = + table.makeChunkedPack( + bounceBuffer.getLength, + GpuDeviceManager.chunkedPackMemoryResource) + + private val tableMeta = withResource(chunkedPack.buildMetadata()) { packedMeta => + MetaUtils.buildTableMeta( + id.tableId, + chunkedPack.getTotalContiguousSize, + packedMeta.getMetadataDirectBuffer, + table.getRowCount) + } + + // take out a lease on the bounce buffer + bounceBuffer.incRefCount() + + def getTotalContiguousSize: Long = chunkedPack.getTotalContiguousSize + + def getMeta: TableMeta = { + tableMeta + } + + override def hasNext: Boolean = { + !closed && chunkedPack.hasNext + } + + def next(): MemoryBuffer = { + val bytesWritten = chunkedPack.next(bounceBuffer) + // we increment the refcount because the caller has no idea where + // this memory came from, so it should close it. + bounceBuffer.slice(0, bytesWritten) + } + + override def close(): Unit = synchronized { + if (!closed) { + closed = true + val toClose = new ArrayBuffer[AutoCloseable]() + toClose.append(chunkedPack, bounceBuffer) + toClose.safeClose() + } + } +} + +/** + * This iterator encapsulates a buffer's internal `MemoryBuffer` access + * for spill reasons. Internally, there are two known implementations: + * - either this is a "single shot" copy, where the entirety of the `RapidsBuffer` is + * already represented as a single contiguous blob of memory, then the expectation + * is that this iterator is exhausted with a single call to `next` + * - or, we have a `RapidsBuffer` that isn't contiguous. This iteration will then + * drive a `ChunkedPacker` to pack the `RapidsBuffer`'s table as needed. The + * iterator will likely need several calls to `next` to be exhausted. + * + * @param buffer `RapidsBuffer` to copy out of its tier. + */ +class RapidsBufferCopyIterator(buffer: RapidsBuffer) + extends Iterator[MemoryBuffer] with AutoCloseable with Logging { + + private val chunkedPacker: Option[ChunkedPacker] = if (buffer.supportsChunkedPacker) { + Some(buffer.getChunkedPacker) + } else { + None + } + + def isChunked: Boolean = chunkedPacker.isDefined + + // this is used for the single shot case to flag when `next` is call + // to satisfy the Iterator interface + private var singleShotCopyHasNext: Boolean = false + private var singleShotBuffer: MemoryBuffer = _ + + if (!isChunked) { + singleShotCopyHasNext = true + singleShotBuffer = buffer.getMemoryBuffer + } + + override def hasNext: Boolean = + chunkedPacker.map(_.hasNext).getOrElse(singleShotCopyHasNext) + + override def next(): MemoryBuffer = { + require(hasNext, + "next called on exhausted iterator") + chunkedPacker.map(_.next()).getOrElse { + singleShotCopyHasNext = false + singleShotBuffer.slice(0, singleShotBuffer.getLength) + } + } + + def getTotalCopySize: Long = { + chunkedPacker + .map(_.getTotalContiguousSize) + .getOrElse(singleShotBuffer.getLength) + } + + override def close(): Unit = { + val hasNextBeforeClose = hasNext + val toClose = new ArrayBuffer[AutoCloseable]() + toClose.appendAll(chunkedPacker) + toClose.appendAll(Option(singleShotBuffer)) + + toClose.safeClose() + require(!hasNextBeforeClose, + "RapidsBufferCopyIterator was closed before exhausting") + } +} + /** Interface provided by all types of RAPIDS buffers */ trait RapidsBuffer extends AutoCloseable { /** The buffer identifier for this buffer. */ val id: RapidsBufferId - /** The size of this buffer in bytes. */ - val size: Long + /** + * The size of this buffer in bytes in its _current_ store. As the buffer goes through + * contiguous split (either added as a contiguous table already, or spilled to host), + * its size changes because contiguous_split adds its own alignment padding. + * + * @note Do not use this size to allocate a target buffer to copy, always use `getPackedSize.` + */ + def getMemoryUsedBytes: Long + + /** + * The size of this buffer if it has already gone through contiguous_split. + * + * @note Use this function when allocating a target buffer for spill or shuffle purposes. + */ + def getPackedSizeBytes: Long = getMemoryUsedBytes + + /** + * At spill time, obtain an iterator used to copy this buffer to a different tier. + */ + def getCopyIterator: RapidsBufferCopyIterator = + new RapidsBufferCopyIterator(this) /** Descriptor for how the memory buffer is formatted */ - val meta: TableMeta + def getMeta: TableMeta /** The storage tier for this buffer */ val storageTier: StorageTier @@ -94,6 +248,12 @@ trait RapidsBuffer extends AutoCloseable { */ def getMemoryBuffer: MemoryBuffer + val supportsChunkedPacker: Boolean = false + + def getChunkedPacker: ChunkedPacker = { + throw new NotImplementedError("not implemented for this store") + } + /** * Copy the content of this buffer into the specified memory buffer, starting from the given * offset. @@ -162,8 +322,10 @@ trait RapidsBuffer extends AutoCloseable { */ sealed class DegenerateRapidsBuffer( override val id: RapidsBufferId, - override val meta: TableMeta) extends RapidsBuffer { - override val size: Long = 0L + val meta: TableMeta) extends RapidsBuffer { + + override def getMemoryUsedBytes: Long = 0L + override val storageTier: StorageTier = StorageTier.DEVICE override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { @@ -200,4 +362,7 @@ sealed class DegenerateRapidsBuffer( override def setSpillPriority(priority: Long): Unit = {} override def close(): Unit = {} + + /** Descriptor for how the memory buffer is formatted */ + override def getMeta: TableMeta = meta } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index ad0db09da7d..5848f395cfe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids import java.util.concurrent.ConcurrentHashMap import java.util.function.BiFunction -import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange, Rmm} -import com.nvidia.spark.rapids.Arm.withResource +import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, NvtxColor, NvtxRange, Rmm, Table} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsBufferCatalog.getExistingRapidsBufferAndAcquire import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.StorageTier.StorageTier @@ -31,6 +31,7 @@ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId} import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.vectorized.ColumnarBatch /** * Exception thrown when inserting a buffer into the catalog with a duplicate buffer ID @@ -308,7 +309,6 @@ class RapidsBufferCatalog( tableMeta: TableMeta, initialSpillPriority: Long, needsSync: Boolean): RapidsBufferHandle = synchronized { - logDebug(s"Adding buffer ${id} to ${deviceStorage}") val rapidsBuffer = deviceStorage.addBuffer( id, buffer, @@ -319,6 +319,50 @@ class RapidsBufferCatalog( makeNewHandle(id, initialSpillPriority) } + /** + * Adds a batch to the device storage. This does NOT take ownership of the + * batch, so it is the responsibility of the caller to close it. + * + * @param batch batch that will be owned by the store + * @param initialSpillPriority starting spill priority value for the batch + * @param needsSync whether the spill framework should stream synchronize while adding + * this batch (defaults to true) + * @return RapidsBufferHandle handle for this RapidsBuffer + */ + def addBatch( + batch: ColumnarBatch, + initialSpillPriority: Long, + needsSync: Boolean = true): RapidsBufferHandle = { + closeOnExcept(GpuColumnVector.from(batch)) { table => + addTable(table, initialSpillPriority, needsSync) + } + } + + /** + * Adds a table to the device storage. + * + * This takes ownership of the table. + * + * @param table table that will be owned by the store + * @param initialSpillPriority starting spill priority value + * @param needsSync whether the spill framework should stream synchronize while adding + * this table (defaults to true) + * @return RapidsBufferHandle handle for this RapidsBuffer + */ + def addTable( + table: Table, + initialSpillPriority: Long, + needsSync: Boolean = true): RapidsBufferHandle = { + val id = TempSpillBufferId() + val rapidsBuffer = deviceStorage.addTable( + id, + table, + initialSpillPriority, + needsSync) + registerNewBuffer(rapidsBuffer) + makeNewHandle(id, initialSpillPriority) + } + /** * Register a degenerate RapidsBufferId given a TableMeta * @note this is called from the shuffle catalogs only @@ -407,7 +451,7 @@ class RapidsBufferCatalog( if (buffers == null || buffers.isEmpty) { throw new NoSuchElementException(s"Cannot locate buffer associated with ID: $id") } - buffers.head.meta + buffers.head.getMeta } /** @@ -482,7 +526,7 @@ class RapidsBufferCatalog( if (nextSpillable != null) { // we have a buffer (nextSpillable) to spill spillAndFreeBuffer(nextSpillable, spillStore, stream) - totalSpilled += nextSpillable.size + totalSpilled += nextSpillable.getMemoryUsedBytes } } else { rmmShouldRetryAlloc = true @@ -527,7 +571,7 @@ class RapidsBufferCatalog( trySpillToMaximumSize(buffer, spillStore, stream) // copy the buffer to spillStore - val newBuffer = spillStore.copyBuffer(buffer, buffer.getMemoryBuffer, stream) + val newBuffer = spillStore.copyBuffer(buffer, stream) // once spilled, we get back a new RapidsBuffer instance in this new tier registerNewBuffer(newBuffer) @@ -555,7 +599,7 @@ class RapidsBufferCatalog( // this spillStore has a maximum size requirement (host only). We need to spill from it // in order to make room for `buffer`. val targetTotalSize = - math.max(spillStoreMaxSize.get - buffer.size, 0) + math.max(spillStoreMaxSize.get - buffer.getMemoryUsedBytes, 0) val maybeAmountSpilled = synchronousSpill(spillStore, targetTotalSize, stream) maybeAmountSpilled.foreach { amountSpilled => if (amountSpilled != 0) { @@ -570,29 +614,21 @@ class RapidsBufferCatalog( * Copies `buffer` to the `deviceStorage` store, registering a new `RapidsBuffer` in * the process * @param buffer - buffer to copy - * @param memoryBuffer - cuDF MemoryBuffer to copy from * @param stream - Cuda.Stream to synchronize on * @return - The `RapidsBuffer` instance that was added to the device store. */ def unspillBufferToDeviceStore( buffer: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBuffer = synchronized { // try to acquire the buffer, if it's already in the store // do not create a new one, else add a reference acquireBuffer(buffer.id, StorageTier.DEVICE) match { case None => - val newBuffer = deviceStorage.copyBuffer( - buffer, - memoryBuffer, - stream) + val newBuffer = deviceStorage.copyBuffer(buffer, stream) newBuffer.addReference() // add a reference since we are about to use it registerNewBuffer(newBuffer) newBuffer - case Some(existingBuffer) => - withResource(memoryBuffer) { _ => - existingBuffer - } + case Some(existingBuffer) => existingBuffer } } @@ -702,7 +738,7 @@ object RapidsBufferCatalog extends Logging { // We are going to re-initialize so make sure all of the old things were closed... closeImpl() assert(memoryEventHandler == null) - deviceStorage = new RapidsDeviceMemoryStore() + deviceStorage = new RapidsDeviceMemoryStore(rapidsConf.chunkedPackBounceBufferSize) diskBlockManager = new RapidsDiskBlockManager(conf) if (rapidsConf.isGdsSpillEnabled) { gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize) @@ -813,6 +849,12 @@ object RapidsBufferCatalog extends Logging { singleton.addBuffer(buffer, tableMeta, initialSpillPriority) } + def addBatch( + batch: ColumnarBatch, + initialSpillPriority: Long): RapidsBufferHandle = { + singleton.addBatch(batch, initialSpillPriority) + } + /** * Lookup the buffer that corresponds to the specified buffer handle and acquire it. * NOTE: It is the responsibility of the caller to close the buffer. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 61511f22e73..19ea2765943 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -66,14 +66,14 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (old != null) { throw new DuplicateBufferException(s"duplicate buffer registered: ${buffer.id}") } - totalBytesStored += buffer.size + totalBytesStored += buffer.getMemoryUsedBytes // device buffers "spillability" is handled via DeviceMemoryBuffer ref counting // so spillableOnAdd should be false, all other buffer tiers are spillable at // all times. if (spillableOnAdd) { if (spillable.offer(buffer)) { - totalBytesSpillable += buffer.size + totalBytesSpillable += buffer.getMemoryUsedBytes } } } @@ -83,9 +83,9 @@ abstract class RapidsBufferStore(val tier: StorageTier) spilling.remove(id) val obj = buffers.remove(id) if (obj != null) { - totalBytesStored -= obj.size + totalBytesStored -= obj.getMemoryUsedBytes if (spillable.remove(obj)) { - totalBytesSpillable -= obj.size + totalBytesSpillable -= obj.getMemoryUsedBytes } } } @@ -119,14 +119,14 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (!spilling.contains(buffer.id) && buffers.containsKey(buffer.id)) { // try to add it to the spillable collection if (spillable.offer(buffer)) { - totalBytesSpillable += buffer.size + totalBytesSpillable += buffer.getMemoryUsedBytes logDebug(s"Buffer ${buffer.id} is spillable. " + s"total=${totalBytesStored} spillable=${totalBytesSpillable}") } // else it was already there (unlikely) } } else { if (spillable.remove(buffer)) { - totalBytesSpillable -= buffer.size + totalBytesSpillable -= buffer.getMemoryUsedBytes logDebug(s"Buffer ${buffer.id} is not spillable. " + s"total=${totalBytesStored}, spillable=${totalBytesSpillable}") } // else it was already removed @@ -138,8 +138,8 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (buffer != null) { // mark the id as "spilling" (this buffer is in the middle of a spill operation) spilling.add(buffer.id) - totalBytesSpillable -= buffer.size - logDebug(s"Spilling buffer ${buffer.id}. size=${buffer.size} " + + totalBytesSpillable -= buffer.getMemoryUsedBytes + logDebug(s"Spilling buffer ${buffer.id}. size=${buffer.getMemoryUsedBytes} " + s"total=${totalBytesStored}, new spillable=${totalBytesSpillable}") } buffer @@ -195,17 +195,13 @@ abstract class RapidsBufferStore(val tier: StorageTier) * (i.e.: this method will not take ownership of the incoming buffer object). * This does not need to update the catalog, the caller is responsible for that. * @param buffer data from another store - * @param memoryBuffer memory buffer obtained from the specified Rapids buffer. The ownership - * for `memoryBuffer` is transferred to this store. The store may close - * `memoryBuffer` if necessary. * @param stream CUDA stream to use for copy or null * @return the new buffer that was created */ def copyBuffer( buffer: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { - freeOnExcept(createBuffer(buffer, memoryBuffer, stream)) { newBuffer => + freeOnExcept(createBuffer(buffer, stream)) { newBuffer => addBuffer(newBuffer) newBuffer } @@ -227,15 +223,11 @@ abstract class RapidsBufferStore(val tier: StorageTier) * @note DO NOT close the buffer unless adding a reference! * @note `createBuffer` impls should synchronize against `stream` before returning, if needed. * @param buffer data from another store - * @param memoryBuffer memory buffer obtained from the specified Rapids buffer. The ownership - * for `memoryBuffer` is transferred to this store. The store may close - * `memoryBuffer` if necessary. * @param stream CUDA stream to use or null * @return the new buffer that was created. */ protected def createBuffer( buffer: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase /** Update bookkeeping for a new buffer */ @@ -254,8 +246,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) /** Base class for all buffers in this store. */ abstract class RapidsBufferBase( override val id: RapidsBufferId, - override val size: Long, - override val meta: TableMeta, + val meta: TableMeta, initialSpillPriority: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends RapidsBuffer { @@ -270,6 +261,8 @@ abstract class RapidsBufferStore(val tier: StorageTier) /** Release the underlying resources for this buffer. */ protected def releaseResources(): Unit + override def getMeta(): TableMeta = meta + /** * Materialize the memory buffer from the underlying storage. * @@ -302,11 +295,12 @@ abstract class RapidsBufferStore(val tier: StorageTier) protected def columnarBatchFromDeviceBuffer(devBuffer: DeviceMemoryBuffer, sparkTypes: Array[DataType]): ColumnarBatch = { - val bufferMeta = meta.bufferMeta() + val tableMeta = getMeta() + val bufferMeta = tableMeta.bufferMeta() if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { - MetaUtils.getBatchFromMeta(devBuffer, meta, sparkTypes) + MetaUtils.getBatchFromMeta(devBuffer, tableMeta, sparkTypes) } else { - GpuCompressedColumnVector.from(devBuffer, meta) + GpuCompressedColumnVector.from(devBuffer, tableMeta) } } @@ -344,7 +338,6 @@ abstract class RapidsBufferStore(val tier: StorageTier) logDebug(s"Unspilling $this $id to $DEVICE") val newBuffer = catalog.unspillBufferToDeviceStore( this, - materializeMemoryBuffer, Cuda.DEFAULT_STREAM) withResource(newBuffer) { _ => return newBuffer.getDeviceMemoryBuffer @@ -360,8 +353,9 @@ abstract class RapidsBufferStore(val tier: StorageTier) materializeMemoryBuffer match { case h: HostMemoryBuffer => withResource(h) { _ => - closeOnExcept(DeviceMemoryBuffer.allocate(size)) { deviceBuffer => - logDebug(s"copying from host $h to device $deviceBuffer") + closeOnExcept(DeviceMemoryBuffer.allocate(h.getLength)) { deviceBuffer => + logDebug(s"copying ${h.getLength} from host $h to device $deviceBuffer " + + s"of size ${deviceBuffer.getLength}") deviceBuffer.copyFromHostBuffer(h) deviceBuffer } @@ -403,7 +397,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) freeBuffer() } } else { - logWarning(s"Trying to free an invalid buffer => $id, size = $size, $this") + logWarning(s"Trying to free an invalid buffer => $id, size = ${getMemoryUsedBytes}, $this") } } @@ -421,6 +415,6 @@ abstract class RapidsBufferStore(val tier: StorageTier) releaseResources() } - override def toString: String = s"$name buffer size=$size" + override def toString: String = s"$name buffer size=${getMemoryUsedBytes}" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 2258a42f5e2..8675d7e4b8e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1856,6 +1856,20 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val CHUNKED_PACK_POOL_SIZE = conf("spark.rapids.sql.chunkedPack.poolSize") + .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + + "scratch space, needed during spill from GPU to host memory.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(10L*1024*1024) + + val CHUNKED_PACK_BOUNCE_BUFFER_SIZE = conf("spark.rapids.sql.chunkedPack.bounceBufferSize") + .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + + "bounce buffer, needed during spill from GPU to host memory. ") + .bytesConf(ByteUnit.BYTE) + .checkValue(v => v >= 1L*1024*1024, + "The chunked pack bounce buffer must be at least 1MB in size") + .createWithDefault(128L * 1024 * 1024) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -2462,6 +2476,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isAqeExchangeReuseFixupEnabled: Boolean = get(ENABLE_AQE_EXCHANGE_REUSE_FIXUP) + lazy val chunkedPackPoolSize: Long = get(CHUNKED_PACK_POOL_SIZE) + + lazy val chunkedPackBounceBufferSize: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_SIZE) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 5f1a313729f..b67e4e4ca5f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -16,48 +16,61 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer} +import java.util.concurrent.ConcurrentHashMap + +import ai.rapids.cudf.{ColumnVector, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.sql.rapids.TempSpillBufferId import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch /** * Buffer storage using device memory. - * @param catalog catalog to register this store + * @param chunkedPackBounceBufferSize this is the size of the bounce buffer to be used + * during spill in chunked_pack. The parameter defaults to 1MB, + * since that is the minimum size of this buffer, but + * ideally it should be closer to 128MB for an A100 with the + * rule-of-thumb of 1MB per SM. */ -class RapidsDeviceMemoryStore +class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) extends RapidsBufferStore(StorageTier.DEVICE) { // The RapidsDeviceMemoryStore handles spillability via ref counting override protected def spillableOnAdd: Boolean = false + // bounce buffer to be used during chunked pack in GPU to host memory spill + private var chunkedPackBounceBuffer: DeviceMemoryBuffer = + DeviceMemoryBuffer.allocate(chunkedPackBounceBufferSize) + override protected def createBuffer( other: RapidsBuffer, - memoryBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { - val deviceBuffer = { - memoryBuffer match { - case d: DeviceMemoryBuffer => d - case h: HostMemoryBuffer => - withResource(h) { _ => - closeOnExcept(DeviceMemoryBuffer.allocate(other.size)) { deviceBuffer => + val memoryBuffer = withResource(other.getCopyIterator) { copyIterator => + copyIterator.next() + } + withResource(memoryBuffer) { _ => + val deviceBuffer = { + memoryBuffer match { + case d: DeviceMemoryBuffer => d + case h: HostMemoryBuffer => + closeOnExcept(DeviceMemoryBuffer.allocate(memoryBuffer.getLength)) { deviceBuffer => logDebug(s"copying from host $h to device $deviceBuffer") deviceBuffer.copyFromHostBuffer(h, stream) deviceBuffer } - } - case b => throw new IllegalStateException(s"Unrecognized buffer: $b") + case b => throw new IllegalStateException(s"Unrecognized buffer: $b") + } } + new RapidsDeviceMemoryBuffer( + other.id, + deviceBuffer.getLength, + other.getMeta, + deviceBuffer, + other.getSpillPriority) } - new RapidsDeviceMemoryBuffer( - other.id, - other.size, - other.meta, - deviceBuffer, - other.getSpillPriority) } /** @@ -90,10 +103,40 @@ class RapidsDeviceMemoryStore initialSpillPriority) freeOnExcept(rapidsBuffer) { _ => logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + - s"uncompressed=${rapidsBuffer.meta.bufferMeta.uncompressedSize}, " + + s"uncompressed=${rapidsBuffer.getMeta().bufferMeta.uncompressedSize}, " + s"meta_id=${tableMeta.bufferMeta.id}, " + s"meta_size=${tableMeta.bufferMeta.size}]") - addDeviceBuffer(rapidsBuffer, needsSync) + addBuffer(rapidsBuffer, needsSync) + rapidsBuffer + } + } + + /** + * Adds a table to the device storage. + * + * This takes ownership of the table. + * + * This function is called only from the RapidsBufferCatalog, under the + * catalog lock. + * + * @param id the RapidsBufferId to use for this table + * @param table table that will be owned by the store + * @param initialSpillPriority starting spill priority value + * @param needsSync whether the spill framework should stream synchronize while adding + * this table (defaults to true) + * @return the RapidsBuffer instance that was added. + */ + def addTable( + id: TempSpillBufferId, + table: Table, + initialSpillPriority: Long, + needsSync: Boolean): RapidsBuffer = { + val rapidsBuffer = new RapidsTable( + id, + table, + initialSpillPriority) + freeOnExcept(rapidsBuffer) { _ => + addBuffer(rapidsBuffer, needsSync) rapidsBuffer } } @@ -105,8 +148,8 @@ class RapidsDeviceMemoryStore * * @param needsSync true if we should stream synchronize before adding the buffer */ - private def addDeviceBuffer( - buffer: RapidsDeviceMemoryBuffer, + private def addBuffer( + buffer: RapidsBufferBase, needsSync: Boolean): Unit = { if (needsSync) { Cuda.DEFAULT_STREAM.sync() @@ -122,15 +165,201 @@ class RapidsDeviceMemoryStore doSetSpillable(buffer, spillable) } + /** + * A per cuDF column event handler that handles calls to .close() + * inside of the `ColumnVector` lock. + * @param rapidsTable the `RapidsTable` this handler was associated with + * @param columnIx the index of the column that this handler belongs to + * @param wrapped an optional RapidsDeviceColumnEventHandler that could be + * not None if this column has been added multiple times to the + * spill store. + */ + class RapidsDeviceColumnEventHandler( + val rapidsTable: RapidsTable, + columnIx: Int, + var wrapped: Option[RapidsDeviceColumnEventHandler] = None) + extends ColumnVector.EventHandler { + + override def onClosed(refCount: Int): Unit = { + // We trigger callbacks iff we reach `refCount` of 1 for this column. + // This signals the `RapidsTable` that a column at index `columnIx` has become + // spillable again. + if (refCount == 1) { + rapidsTable.onColumnSpillable(columnIx) + wrapped.foreach(_.onClosed(refCount)) + } + } + } + + /** + * A `RapidsTable` is the spill store holder of a cuDF `Table`. + * + * The table is not contiguous in GPU memory. Instead, this `RapidsBuffer` instance + * allows us to use the cuDF chunked_pack API to make the table contiguous as the spill + * is happening. + * + * This class owns the cuDF table and will close it when `close` is called. + * + * @param id the `RapidsBufferId` this table is associated with + * @param table the cuDF table that we are managing + * @param spillPriority a starting spill priority + */ + class RapidsTable( + id: TempSpillBufferId, + table: Table, + spillPriority: Long) + extends RapidsBufferBase( + id, + null, + spillPriority) { + + // we register our event callbacks as the very first action to deal with + // spillability + registerOnCloseEventHandler() + + // By default all columns are NOT spillable since we are not the only owners of + // the columns (the caller is holding onto a ColumnarBatch that will be closed + // after instantiation, triggering onClosed callbacks) + // This hash set contains the columns that are currently spillable + private val columnSpillability = new ConcurrentHashMap[Int, Boolean]() + + /** Release the underlying resources for this buffer. */ + override protected def releaseResources(): Unit = { + table.close() + } + + /** The storage tier for this buffer */ + override val storageTier: StorageTier = StorageTier.DEVICE + + override val supportsChunkedPacker: Boolean = true + + private var initializedChunkedPacker: Boolean = false + + lazy val chunkedPacker: ChunkedPacker = { + initializedChunkedPacker = true + new ChunkedPacker(id, table, chunkedPackBounceBuffer) + } + + override def getMeta(): TableMeta = { + chunkedPacker.getMeta + } + + // This is the current size in batch form. It is to be used while this + // table hasn't migrated to another store. + private val unpackedSizeInBytes: Long = GpuColumnVector.getTotalDeviceMemoryUsed(table) + + override def getMemoryUsedBytes: Long = unpackedSizeInBytes + + override def getPackedSizeBytes: Long = getChunkedPacker.getTotalContiguousSize + + override def getChunkedPacker: ChunkedPacker = { + chunkedPacker + } + + /** + * Called from RapidsDeviceColumnEventHandler.onClosed while holding onto the lock + * of a column in our table. + * + * @param columnIx - index of column to mark spillable + */ + def onColumnSpillable(columnIx: Int): Unit = { + columnSpillability.put(columnIx, true) + doSetSpillable(this, columnSpillability.size == table.getNumberOfColumns) + } + + /** + * Produce a `ColumnarBatch` from our table, and in the process make ourselves + * not spillable. + * + * @param sparkTypes the spark data types the batch should have + */ + override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { + columnSpillability.clear() + doSetSpillable(this, false) + GpuColumnVector.from(table, sparkTypes) + } + + /** + * Get the underlying memory buffer. This may be either a HostMemoryBuffer or a + * DeviceMemoryBuffer depending on where the buffer currently resides. + * The caller must have successfully acquired the buffer beforehand. + * + * @see [[addReference]] + * @note It is the responsibility of the caller to close the buffer. + */ + override def getMemoryBuffer: MemoryBuffer = { + throw new UnsupportedOperationException( + "RapidsDeviceMemoryBatch doesn't support getMemoryBuffer") + } + + override def free(): Unit = { + // lets remove our handler from the chain of handlers for each column + removeOnCloseEventHandler() + super.free() + if (initializedChunkedPacker) { + chunkedPacker.close() + initializedChunkedPacker = false + } + } + + private def registerOnCloseEventHandler(): Unit = { + val cudfColumns = (0 until table.getNumberOfColumns).map(table.getColumn) + cudfColumns.zipWithIndex.foreach { case (cv, columnIx) => + cv.synchronized { + val priorEventHandler = cv.getEventHandler.asInstanceOf[RapidsDeviceColumnEventHandler] + val columnEventHandler = + new RapidsDeviceColumnEventHandler( + this, + columnIx, + Option(priorEventHandler)) + cv.setEventHandler(columnEventHandler) + } + } + } + + private def removeOnCloseEventHandler(): Unit = { + val cudfColumns = (0 until table.getNumberOfColumns).map(table.getColumn) + cudfColumns.foreach { cv => + cv.synchronized { + cv.getEventHandler match { + case handler: RapidsDeviceColumnEventHandler => + // find the event handler that belongs to this rapidsBuffer + var priorEventHandler = handler + var parent = priorEventHandler + while (priorEventHandler != null && priorEventHandler.rapidsTable != this) { + parent = priorEventHandler + priorEventHandler = priorEventHandler.wrapped.orNull + } + // if our handler is at the head + if (priorEventHandler == handler) { + cv.setEventHandler(priorEventHandler.wrapped.orNull) + } else { + // remove ourselves from the chain + parent.wrapped = if (priorEventHandler != null) { + priorEventHandler.wrapped + } else { + null + } + } + case t => + throw new IllegalStateException(s"Unknown column event handler $t") + } + } + } + } + } + class RapidsDeviceMemoryBuffer( id: RapidsBufferId, size: Long, meta: TableMeta, contigBuffer: DeviceMemoryBuffer, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) + extends RapidsBufferBase(id, meta, spillPriority) with MemoryBuffer.EventHandler { + override def getMemoryUsedBytes(): Long = size + override val storageTier: StorageTier = StorageTier.DEVICE // If this require triggers, we are re-adding a `DeviceMemoryBuffer` outside of @@ -204,4 +433,9 @@ class RapidsDeviceMemoryStore super.free() } } + override def close(): Unit = { + super.close() + chunkedPackBounceBuffer.close() + chunkedPackBounceBuffer = null + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index 10d9da5652a..268b85e3e10 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -34,8 +34,12 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) override protected def createBuffer( incoming: RapidsBuffer, - incomingBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { + // assuming that the disk store gets contiguous buffers + val incomingBuffer = + withResource(incoming.getCopyIterator) { incomingCopyIterator => + incomingCopyIterator.next() + } withResource(incomingBuffer) { _ => val hostBuffer = incomingBuffer match { case h: HostMemoryBuffer => h @@ -55,12 +59,12 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) } else { copyBufferToPath(hostBuffer, path, append = false) } - logDebug(s"Spilled to $path $fileOffset:${incoming.size}") + logDebug(s"Spilled to $path $fileOffset:${incomingBuffer.getLength}") new RapidsDiskBuffer( id, fileOffset, - incoming.size, - incoming.meta, + incomingBuffer.getLength, + incoming.getMeta, incoming.getSpillPriority) } } @@ -94,9 +98,11 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) meta: TableMeta, spillPriority: Long) extends RapidsBufferBase( - id, size, meta, spillPriority) { + id, meta, spillPriority) { private[this] var hostBuffer: Option[HostMemoryBuffer] = None + override def getMemoryUsedBytes(): Long = size + override val storageTier: StorageTier = StorageTier.DISK override def getMemoryBuffer: MemoryBuffer = synchronized { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 4f94939259f..9da4a9d7a9b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -37,8 +37,14 @@ class RapidsGdsStore( extends RapidsBufferStore(StorageTier.GDS) { private[this] val batchSpiller = new BatchSpiller() - override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer, + override protected def createBuffer( + other: RapidsBuffer, stream: Cuda.Stream): RapidsBufferBase = { + // assume that we get 1 buffer + val otherBuffer = withResource(other.getCopyIterator) { it => + it.next() + } + withResource(otherBuffer) { _ => val deviceBuffer = otherBuffer match { case d: BaseDeviceMemoryBuffer => d @@ -59,12 +65,14 @@ class RapidsGdsStore( abstract class RapidsGdsBuffer( override val id: RapidsBufferId, - override val size: Long, + val size: Long, override val meta: TableMeta, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) { + extends RapidsBufferBase(id, meta, spillPriority) { override val storageTier: StorageTier = StorageTier.GDS + override def getMemoryUsedBytes(): Long = size + override def getMemoryBuffer: MemoryBuffer = getDeviceMemoryBuffer } @@ -122,13 +130,13 @@ class RapidsGdsStore( CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer) 0 } - logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS") + logDebug(s"Spilled to $path $fileOffset:${deviceBuffer.getLength} via GDS") new RapidsGdsSingleShotBuffer( id, path, fileOffset, - other.size, - other.meta, + deviceBuffer.getLength, + other.getMeta, other.getSpillPriority) } @@ -169,8 +177,8 @@ class RapidsGdsStore( id, currentFile, currentOffset, - other.size, - other.meta, + deviceBuffer.getLength, + other.getMeta, other.getSpillPriority) currentOffset += alignUp(deviceBuffer.getLength) pendingBuffers += gdsBuffer @@ -223,6 +231,8 @@ class RapidsGdsStore( var isPending: Boolean = true) extends RapidsGdsBuffer(id, size, meta, spillPriority) { + override def getMemoryUsedBytes(): Long = size + override def materializeMemoryBuffer: MemoryBuffer = this.synchronized { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => if (isPending) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 360cb7fcf78..1a90562416f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool} +import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange, PinnedMemoryPool} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_DIRECT_OFFSET, HOST_MEMORY_BUFFER_PAGEABLE_OFFSET, HOST_MEMORY_BUFFER_PINNED_OFFSET} import com.nvidia.spark.rapids.StorageTier.StorageTier @@ -42,10 +42,15 @@ class RapidsHostMemoryStore( override def getMaxSize: Option[Long] = Some(maxSize) - private def allocateHostBuffer(size: Long): (HostMemoryBuffer, AllocationMode) = { - var buffer: HostMemoryBuffer = PinnedMemoryPool.tryAllocate(size) - if (buffer != null) { - return (buffer, Pinned) + private def allocateHostBuffer( + size: Long, + preferPinned: Boolean = true): (HostMemoryBuffer, AllocationMode) = { + var buffer: HostMemoryBuffer = null + if (preferPinned) { + PinnedMemoryPool.tryAllocate(size) + if (buffer != null) { + return (buffer, Pinned) + } } val allocation = addressAllocator.allocate(size) @@ -62,26 +67,46 @@ class RapidsHostMemoryStore( (HostMemoryBuffer.allocate(size, false), Direct) } - override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer, + override protected def createBuffer( + other: RapidsBuffer, stream: Cuda.Stream): RapidsBufferBase = { - withResource(otherBuffer) { _ => - val (hostBuffer, allocationMode) = allocateHostBuffer(other.size) - try { - otherBuffer match { - case devBuffer: DeviceMemoryBuffer => - hostBuffer.copyFromDeviceBuffer(devBuffer, stream) - case _ => - throw new IllegalStateException("copying from buffer without device memory") + withResource(other.getCopyIterator) { otherBufferIterator => + val isChunked = otherBufferIterator.isChunked + val totalCopySize = otherBufferIterator.getTotalCopySize + val (hostBuffer, allocationMode) = allocateHostBuffer(totalCopySize, false) + withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => + var hostOffset = 0L + val start = System.nanoTime() + while (otherBufferIterator.hasNext) { + val otherBuffer = otherBufferIterator.next() + withResource(otherBuffer) { _ => + try { + otherBuffer match { + case devBuffer: DeviceMemoryBuffer => + hostBuffer.copyFromMemoryBufferAsync( + hostOffset, devBuffer, 0, otherBuffer.getLength, stream) + hostOffset += otherBuffer.getLength + case _ => + throw new IllegalStateException("copying from buffer without device memory") + } + } catch { + case e: Exception => + hostBuffer.close() + throw e + } + } } - } catch { - case e: Exception => - hostBuffer.close() - throw e + stream.sync() + val end = System.nanoTime() + val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong + val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong + logDebug(s"Spill to host (mode=$allocationMode, chunked=$isChunked) " + + s"size=$szMB MiB bandwidth=$bw MiB/sec") } new RapidsHostMemoryBuffer( other.id, - other.size, - other.meta, + totalCopySize, + other.getMeta, applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), hostBuffer, allocationMode) @@ -103,7 +128,7 @@ class RapidsHostMemoryStore( buffer: HostMemoryBuffer, allocationMode: AllocationMode) extends RapidsBufferBase( - id, size, meta, spillPriority) { + id, meta, spillPriority) { override val storageTier: StorageTier = StorageTier.HOST override def getMemoryBuffer: MemoryBuffer = { @@ -121,5 +146,8 @@ class RapidsHostMemoryStore( } buffer.close() } + + /** The size of this buffer in bytes. */ + override def getMemoryUsedBytes: Long = size } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 689645fae30..4a3835fba1c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -97,7 +97,7 @@ class SpillableColumnarBatchImpl ( } override lazy val sizeInBytes: Long = - withRapidsBuffer(_.size) + withRapidsBuffer(_.getMemoryUsedBytes) /** * Set a new spill priority. @@ -206,14 +206,7 @@ object SpillableColumnarBatch { val buff = cv.getBuffer RapidsBufferCatalog.addBuffer(buff, cv.getTableMeta, initialSpillPriority) } else { - withResource(GpuColumnVector.from(batch)) { tmpTable => - withResource(tmpTable.contiguousSplit()) { contigTables => - require(contigTables.length == 1, "Unexpected number of contiguous spit tables") - RapidsBufferCatalog.addContiguousTable( - contigTables.head, - initialSpillPriority) - } - } + RapidsBufferCatalog.addBatch(batch, initialSpillPriority) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala index 198ad9dbb86..628f1370e18 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala @@ -82,8 +82,8 @@ class BufferSendState( val bufferTransferRequest = transferRequest.requests(btr, ix) withResource(requestHandler.acquireShuffleBuffer( bufferTransferRequest.bufferId())) { table => - bufferMetas(ix) = table.meta.bufferMeta() - new SendBlock(bufferTransferRequest.bufferId(), table.size) + bufferMetas(ix) = table.getMeta.bufferMeta() + new SendBlock(bufferTransferRequest.bufferId(), table.getPackedSizeBytes) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala index a7a4f984d21..6c4a3d54d45 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala @@ -362,7 +362,7 @@ class RapidsShuffleIterator( try { sb = catalog.acquireBuffer(handle) cb = sb.getColumnarBatch(sparkTypes) - metricsUpdater.update(blockedTime, 1, sb.size, cb.numRows()) + metricsUpdater.update(blockedTime, 1, sb.getMemoryUsedBytes, cb.numRows()) } finally { nvtxRangeAfterGettingBatch.close() range.close() diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala index 4a923edcdcf..2c7a245ce73 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala @@ -763,7 +763,7 @@ trait HasGpuStringReplace { strExpr: GpuColumnVector, search: Seq[String], replacement: String): ColumnVector = { - withResource(ColumnVector.fromStrings(search: _*)) { targets => + withResource(ColumnVector.fromStrings(search: _*)) { targets => withResource(ColumnVector.fromStrings(replacement)) { repls => strExpr.getBase.stringReplace(targets, repls) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 0014828fc42..04a1a2917fd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -25,16 +25,23 @@ import java.util.TimeZone import scala.collection.JavaConverters._ import scala.util.{Failure, Random, Success, Try} +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, NamedExpression} import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types._ -class CastOpSuite extends GpuExpressionTestSuite { +class CastOpSuite extends GpuExpressionTestSuite with BeforeAndAfterAll { import CastOpSuite._ + override def afterAll(): Unit = { + TrampolineUtil.cleanupAnyExistingSession() + } + private val sparkConf = new SparkConf() .set(RapidsConf.ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES.key, "true") .set(RapidsConf.ENABLE_CAST_STRING_TO_FLOAT.key, "true") diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala index ea9d4b88ccc..6e8500da1b7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala @@ -204,7 +204,7 @@ class GpuCoalesceBatchesRetrySuite val batches = iter.asInstanceOf[CoalesceIteratorMocks].getBatches() assertResult(10)(batches.length) batches.foreach(b => - verify(b, times(1)).close() + verify(b, times(3)).close() ) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala index 88e0ab857ae..3112cc89759 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala @@ -227,26 +227,33 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { withResource(handle) { _ => catalog.synchronousSpill(deviceStore, 0) val acquiredHostBuffer = catalog.acquireBuffer(handle) - withResource(acquiredHostBuffer) { _ => + val unspilled = withResource(acquiredHostBuffer) { _ => assertResult(HOST)(acquiredHostBuffer.storageTier) val unspilled = catalog.unspillBufferToDeviceStore( acquiredHostBuffer, - acquiredHostBuffer.getMemoryBuffer, Cuda.DEFAULT_STREAM) withResource(unspilled) { _ => assertResult(DEVICE)(unspilled.storageTier) } val unspilledSame = catalog.unspillBufferToDeviceStore( acquiredHostBuffer, - acquiredHostBuffer.getMemoryBuffer, Cuda.DEFAULT_STREAM) withResource(unspilledSame) { _ => assertResult(unspilled)(unspilledSame) } // verify that we invoked the copy function exactly once - verify(deviceStore, times(1)).copyBuffer(any(), any(), any()) + verify(deviceStore, times(1)).copyBuffer(any(), any()) + unspilled } + val unspilledSame = catalog.unspillBufferToDeviceStore( + acquiredHostBuffer, + Cuda.DEFAULT_STREAM) + withResource(unspilledSame) { _ => + assertResult(unspilled)(unspilledSame) + } + // verify that we invoked the copy function exactly once + verify(deviceStore, times(1)).copyBuffer(any(), any()) } } } @@ -322,8 +329,8 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { var _acquireAttempts: Int = acquireAttempts var currentPriority: Long = initialPriority override val id: RapidsBufferId = bufferId - override val size: Long = 0 - override val meta: TableMeta = tableMeta + override def getMemoryUsedBytes: Long = 0 + override def getMeta: TableMeta = tableMeta override val storageTier: StorageTier = tier override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = null override def getMemoryBuffer: MemoryBuffer = null diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 5c7b8d8b67d..2246b6d2117 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -168,7 +168,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { val resultBuffer = captor.getValue assertResult(bufferId)(resultBuffer.id) assertResult(spillPriority)(resultBuffer.getSpillPriority) - assertResult(meta)(resultBuffer.meta) + assertResult(meta)(resultBuffer.getMeta) } } @@ -229,13 +229,6 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { } } - test("cannot receive spilled buffers") { - withResource(new RapidsDeviceMemoryStore) { store => - assertThrows[IllegalStateException](store.copyBuffer( - mock[RapidsBuffer], mock[MemoryBuffer], Cuda.DEFAULT_STREAM)) - } - } - test("size statistics") { withResource(new RapidsDeviceMemoryStore) { store => @@ -315,22 +308,22 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { override protected def createBuffer( b: RapidsBuffer, - m: MemoryBuffer, s: Cuda.Stream): RapidsBufferBase = { - withResource(m) { _ => - spilledBuffers += b.id - new MockRapidsBuffer(b.id, b.size, b.meta, b.getSpillPriority) - } + spilledBuffers += b.id + new MockRapidsBuffer(b.id, b.getPackedSizeBytes, b.getMeta, b.getSpillPriority) } class MockRapidsBuffer(id: RapidsBufferId, size: Long, meta: TableMeta, spillPriority: Long) - extends RapidsBufferBase(id, size, meta, spillPriority) { + extends RapidsBufferBase(id, meta, spillPriority) { override protected def releaseResources(): Unit = {} override val storageTier: StorageTier = StorageTier.HOST override def getMemoryBuffer: MemoryBuffer = throw new UnsupportedOperationException + + /** The size of this buffer in bytes. */ + override def getMemoryUsedBytes: Long = size } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index 7df34c90cfd..64ee8b7f42b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -68,7 +68,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { ArgumentMatchers.eq(handle.id), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.DISK)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) + assertResult(bufferSize)(buffer.getMemoryUsedBytes) assertResult(handle.id)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala index d1be2ae104a..176647e6a8f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala @@ -92,7 +92,7 @@ class RapidsGdsStoreSuite extends FunSuiteWithTempDir with MockitoSugar { withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.GDS)(buffer.storageTier) assertResult(id)(buffer.id) - assertResult(size)(buffer.size) + assertResult(size)(buffer.getMemoryUsedBytes) assertResult(spillPriority)(buffer.getSpillPriority) } } @@ -126,7 +126,7 @@ class RapidsGdsStoreSuite extends FunSuiteWithTempDir with MockitoSugar { ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.GDS)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) + assertResult(bufferSize)(buffer.getMemoryUsedBytes) assertResult(bufferId)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index 0e004423272..068f17a9023 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import java.io.File import java.math.RoundingMode -import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, HostMemoryBuffer, MemoryBuffer, Table} +import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, HostMemoryBuffer, Table} import com.nvidia.spark.rapids.Arm._ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.ArgumentMatchers.any @@ -85,7 +85,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { ArgumentMatchers.eq(handle.id), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.HOST)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) + assertResult(bufferSize)(buffer.getMemoryUsedBytes) assertResult(handle.id)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } @@ -175,7 +175,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { override def getDiskPath(diskBlockManager: RapidsDiskBlockManager): File = null }) when(mockStore.getMaxSize).thenAnswer(_ => None) - when(mockStore.copyBuffer(any(), any(), any())).thenReturn(mockBuff) + when(mockStore.copyBuffer(any(), any())).thenReturn(mockBuff) when(mockStore.tier) thenReturn (StorageTier.DISK) withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize)) { hostStore => devStore.setSpillStore(hostStore) @@ -200,7 +200,6 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { bigTable = null catalog.synchronousSpill(devStore, 0) verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer], - ArgumentMatchers.any[MemoryBuffer], ArgumentMatchers.any[Cuda.Stream]) withResource(catalog.acquireBuffer(bigHandle)) { buffer => assertResult(StorageTier.HOST)(buffer.storageTier) @@ -218,13 +217,9 @@ class RapidsHostMemoryStoreSuite extends FunSuite with MockitoSugar { catalog.synchronousSpill(devStore, 0) val rapidsBufferCaptor: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer]) - val memoryBufferCaptor: ArgumentCaptor[MemoryBuffer] = - ArgumentCaptor.forClass(classOf[MemoryBuffer]) verify(mockStore).copyBuffer(rapidsBufferCaptor.capture(), - memoryBufferCaptor.capture(), ArgumentMatchers.any[Cuda.Stream]) - withResource(memoryBufferCaptor.getValue) { _ => - assertResult(bigHandle.id)(rapidsBufferCaptor.getValue.id) - } + ArgumentMatchers.any[Cuda.Stream]) + assertResult(bigHandle.id)(rapidsBufferCaptor.getValue.id) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala index f1de989b605..296f0cf2ce2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala @@ -34,6 +34,7 @@ class RmmSparkRetrySuiteBase extends FunSuite with BeforeAndAfterEach { } val deviceStorage = new RapidsDeviceMemoryStore() val catalog = new RapidsBufferCatalog(deviceStorage) + RapidsBufferCatalog.setDeviceStorage(deviceStorage) RapidsBufferCatalog.setCatalog(catalog) val mockEventHandler = new BaseRmmEventHandler() RmmSpark.setEventHandler(mockEventHandler) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala index cc55d9c6b0f..6f2d216c1eb 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala @@ -53,6 +53,7 @@ class WithRetrySuite } val deviceStorage = new RapidsDeviceMemoryStore() val catalog = new RapidsBufferCatalog(deviceStorage) + RapidsBufferCatalog.setDeviceStorage(deviceStorage) RapidsBufferCatalog.setCatalog(catalog) val mockEventHandler = new BaseRmmEventHandler() RmmSpark.setEventHandler(mockEventHandler) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala index a8e6c789906..afc17081afb 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala @@ -227,7 +227,7 @@ class RapidsShuffleIteratorSuite extends RapidsShuffleTestHelper { assert(cl.hasNext) assertResult(cb)(cl.next()) assertResult(1)(testMetricsUpdater.totalRemoteBlocksFetched) - assertResult(mockBuffer.size)(testMetricsUpdater.totalRemoteBytesRead) + assertResult(mockBuffer.getMemoryUsedBytes)(testMetricsUpdater.totalRemoteBytesRead) assertResult(10)(testMetricsUpdater.totalRowsFetched) } finally { RmmSpark.taskDone(taskId) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala index bb958280c6a..ee8848c463d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala @@ -54,8 +54,8 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val stream = invocation.getArgument[Cuda.Stream](4) dst.copyFromMemoryBuffer(dstOffset, deviceBuffer, srcOffset, length, stream) } - when(mockBuffer.size).thenReturn(deviceBuffer.getLength) - when(mockBuffer.meta).thenReturn(mockMeta) + when(mockBuffer.getMemoryUsedBytes).thenReturn(deviceBuffer.getLength) + when(mockBuffer.getMeta).thenReturn(mockMeta) mockBuffer } } @@ -229,8 +229,8 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val bb = ByteBuffer.allocateDirect(123) withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) - when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.size).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getMeta).thenReturn(tableMeta) + when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -286,8 +286,8 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val bb = ByteBuffer.allocateDirect(123) withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) - when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.size).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getMeta).thenReturn(tableMeta) + when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -370,8 +370,8 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { def makeMockBuffer(tableId: Int, bb: ByteBuffer): RapidsBuffer = { val rapidsBuffer = mock[RapidsBuffer] val tableMeta = MetaUtils.buildTableMeta(tableId, 456, bb, 100) - when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.size).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getMeta).thenReturn(tableMeta) + when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(tableId))) .thenReturn(rapidsBuffer) rapidsBuffer diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index 0ebd3b54d49..0998b4c7b7a 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -47,8 +47,8 @@ class SpillableColumnarBatchSuite extends FunSuite { } class MockBuffer(override val id: RapidsBufferId) extends RapidsBuffer { - override val size: Long = 123 - override val meta: TableMeta = null + override def getMemoryUsedBytes: Long = 123 + override def getMeta: TableMeta = null override val storageTier: StorageTier = StorageTier.DEVICE override def getMemoryBuffer: MemoryBuffer = null override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long, From 436234ec2314437f302c5729ebc71f87c14b983e Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 9 May 2023 22:14:15 -0500 Subject: [PATCH 02/10] Added a few tests for non-contiguous tables --- .../rapids/RapidsDeviceMemoryStore.scala | 98 +++++++----- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 143 +++++++++++++++++- 2 files changed, 201 insertions(+), 40 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index b67e4e4ca5f..da5dc6f2b64 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids import java.util.concurrent.ConcurrentHashMap +import scala.collection.mutable + import ai.rapids.cudf.{ColumnVector, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.StorageTier.StorageTier @@ -131,13 +133,14 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) table: Table, initialSpillPriority: Long, needsSync: Boolean): RapidsBuffer = { - val rapidsBuffer = new RapidsTable( + val rapidsTable = new RapidsTable( id, table, initialSpillPriority) - freeOnExcept(rapidsBuffer) { _ => - addBuffer(rapidsBuffer, needsSync) - rapidsBuffer + freeOnExcept(rapidsTable) { _ => + addBuffer(rapidsTable, needsSync) + rapidsTable.updateSpillability() + rapidsTable } } @@ -170,22 +173,27 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) * inside of the `ColumnVector` lock. * @param rapidsTable the `RapidsTable` this handler was associated with * @param columnIx the index of the column that this handler belongs to + * @param repetitionCount the number of times that this column appeared in `RapidsTable` * @param wrapped an optional RapidsDeviceColumnEventHandler that could be * not None if this column has been added multiple times to the * spill store. */ class RapidsDeviceColumnEventHandler( val rapidsTable: RapidsTable, - columnIx: Int, + column: ColumnVector, + repetitionCount: Int, var wrapped: Option[RapidsDeviceColumnEventHandler] = None) extends ColumnVector.EventHandler { override def onClosed(refCount: Int): Unit = { - // We trigger callbacks iff we reach `refCount` of 1 for this column. + // We trigger callbacks iff we reach `refCount` of repetitionCount for this column. + // repetitionCount == 1 for a column that is not repeated in a table, so this means + // we are looking for a refCount of 1, but if the column is aliased several times in the + // table, refCount will be equal to the number of aliases (aka repetitionCount). // This signals the `RapidsTable` that a column at index `columnIx` has become // spillable again. - if (refCount == 1) { - rapidsTable.onColumnSpillable(columnIx) + if (refCount == repetitionCount) { + rapidsTable.onColumnSpillable(column) wrapped.foreach(_.onClosed(refCount)) } } @@ -213,21 +221,6 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) null, spillPriority) { - // we register our event callbacks as the very first action to deal with - // spillability - registerOnCloseEventHandler() - - // By default all columns are NOT spillable since we are not the only owners of - // the columns (the caller is holding onto a ColumnarBatch that will be closed - // after instantiation, triggering onClosed callbacks) - // This hash set contains the columns that are currently spillable - private val columnSpillability = new ConcurrentHashMap[Int, Boolean]() - - /** Release the underlying resources for this buffer. */ - override protected def releaseResources(): Unit = { - table.close() - } - /** The storage tier for this buffer */ override val storageTier: StorageTier = StorageTier.DEVICE @@ -240,14 +233,32 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) new ChunkedPacker(id, table, chunkedPackBounceBuffer) } - override def getMeta(): TableMeta = { - chunkedPacker.getMeta - } - // This is the current size in batch form. It is to be used while this // table hasn't migrated to another store. private val unpackedSizeInBytes: Long = GpuColumnVector.getTotalDeviceMemoryUsed(table) + // By default all columns are NOT spillable since we are not the only owners of + // the columns (the caller is holding onto a ColumnarBatch that will be closed + // after instantiation, triggering onClosed callbacks) + // This hash set contains the columns that are currently spillable + private val columnSpillability = new ConcurrentHashMap[ColumnVector, Boolean]() + + private val distinctColumns = + (0 until table.getNumberOfColumns).map(table.getColumn).distinct.toArray + + // we register our event callbacks as the very first action to deal with + // spillability + registerOnCloseEventHandler() + + /** Release the underlying resources for this buffer. */ + override protected def releaseResources(): Unit = { + table.close() + } + + override def getMeta(): TableMeta = { + chunkedPacker.getMeta + } + override def getMemoryUsedBytes: Long = unpackedSizeInBytes override def getPackedSizeBytes: Long = getChunkedPacker.getTotalContiguousSize @@ -262,9 +273,17 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) * * @param columnIx - index of column to mark spillable */ - def onColumnSpillable(columnIx: Int): Unit = { - columnSpillability.put(columnIx, true) - doSetSpillable(this, columnSpillability.size == table.getNumberOfColumns) + def onColumnSpillable(column: ColumnVector): Unit = { + columnSpillability.put(column, true) + updateSpillability() + } + + /** + * This is called after adding this RapidsTable to the spillable store + * in order to update its spillability status. + */ + def updateSpillability(): Unit = { + doSetSpillable(this, columnSpillability.size == distinctColumns.size) } /** @@ -304,22 +323,33 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) private def registerOnCloseEventHandler(): Unit = { val cudfColumns = (0 until table.getNumberOfColumns).map(table.getColumn) - cudfColumns.zipWithIndex.foreach { case (cv, columnIx) => + // cudfColumns could contain duplicates. We need to take this into account when we are + // deciding the floor refCount for a duplicated column + val repetitionPerColumn = new mutable.HashMap[ColumnVector, Int]() + cudfColumns.foreach { col => + val repetitionCount = repetitionPerColumn.getOrElse(col, 0) + repetitionPerColumn(col) = repetitionCount + 1 + } + + distinctColumns.foreach { cv => cv.synchronized { val priorEventHandler = cv.getEventHandler.asInstanceOf[RapidsDeviceColumnEventHandler] val columnEventHandler = new RapidsDeviceColumnEventHandler( this, - columnIx, + cv, + repetitionPerColumn(cv), Option(priorEventHandler)) cv.setEventHandler(columnEventHandler) + if (cv.getRefCount == repetitionPerColumn(cv)) { + onColumnSpillable(cv) + } } } } private def removeOnCloseEventHandler(): Unit = { - val cudfColumns = (0 until table.getNumberOfColumns).map(table.getColumn) - cudfColumns.foreach { cv => + distinctColumns.foreach { cv => cv.synchronized { cv.getEventHandler match { case handler: RapidsDeviceColumnEventHandler => diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 2246b6d2117..6ed6869472f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -21,7 +21,7 @@ import java.math.RoundingMode import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} +import ai.rapids.cudf.{ColumnVector, ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -31,16 +31,32 @@ import org.scalatest.FunSuite import org.scalatest.mockito.MockitoSugar import org.apache.spark.sql.rapids.RapidsDiskBlockManager -import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, StringType} +import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, FloatType, IntegerType, StringType} class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { - private def buildContiguousTable(): ContiguousTable = { - withResource(new Table.TestBuilder() + private def buildTable(): Table = { + new Table.TestBuilder() .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) .column(5.0, 2.0, 3.0, 1.0) .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) - .build()) { table => + .build() + } + + private def buildTableWithDuplicate(): Table = { + withResource(ColumnVector.fromInts(5, null.asInstanceOf[java.lang.Integer], 3, 1)) { intCol => + withResource(ColumnVector.fromStrings("five", "two", null, null)) { stringCol => + withResource(ColumnVector.fromDoubles(5.0, 2.0, 3.0, 1.0)) { doubleCol => + // add intCol twice + new Table(intCol, intCol, stringCol, doubleCol) + } + } + } + } + + + private def buildContiguousTable(): ContiguousTable = { + withResource(buildTable()) { table => table.contiguousSplit()(0) } } @@ -62,7 +78,122 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { } } - test("a table is not spillable until the owner closes it") { + test("a non-contiguous table is spillable and it is handed over to the store") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + catalog.addTable(table, spillPriority) + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("a non-contiguous table becomes non-spillable when batch is obtained") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + val batch = withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + rapidsBuffer.getColumnarBatch(types) + } + withResource(batch) { _ => + assertResult(buffSize)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + } + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an aliased non-contiguous table is not spillable (until closing the alias) ") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + val aliasHandle = withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + // extract the batch from the table we added, and add it back as a batch + withResource(rapidsBuffer.getColumnarBatch(types)) { batch => + catalog.addBatch(batch, spillPriority) + } + } // we now have two copies in the store + assertResult(buffSize*2)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + + aliasHandle.close() // remove the alias + + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an aliased non-contiguous table is not spillable (until closing the original) ") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + // extract the batch from the table we added, and add it back as a batch + withResource(rapidsBuffer.getColumnarBatch(types)) { batch => + catalog.addBatch(batch, spillPriority) + } + } // we now have two copies in the store + assertResult(buffSize * 2)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + + handle.close() // remove the original + + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an non-contiguous table supports duplicated columns") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTableWithDuplicate() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, IntegerType, StringType, DoubleType).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + // extract the batch from the table we added, and add it back as a batch + withResource(rapidsBuffer.getColumnarBatch(types)) { batch => + catalog.addBatch(batch, spillPriority) + } + } // we now have two copies in the store + assertResult(buffSize * 2)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + + handle.close() // remove the original + + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("a contiguous table is not spillable until the owner closes it") { withResource(new RapidsDeviceMemoryStore) { store => val catalog = spy(new RapidsBufferCatalog(store)) val spillPriority = 3 From 7766720759f32d997e39feafc22f1db252d1fcbb Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 10 May 2023 14:13:16 -0500 Subject: [PATCH 03/10] Add test showing that incRefCounting a column keeps the table non-spillable --- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 6ed6869472f..d5904dc194c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -54,7 +54,6 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { } } - private def buildContiguousTable(): ContiguousTable = { withResource(buildTable()) { table => table.contiguousSplit()(0) @@ -112,7 +111,38 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { } } - test("an aliased non-contiguous table is not spillable (until closing the alias) ") { + test("a non-contiguous table is non-spillable until all columns are returned") { + withResource(new RapidsDeviceMemoryStore) { store => + val catalog = spy(new RapidsBufferCatalog(store)) + val spillPriority = 3 + val table = buildTable() + val handle = catalog.addTable(table, spillPriority) + val types: Array[DataType] = + Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) + assertResult(buffSize)(store.currentSize) + assertResult(buffSize)(store.currentSpillableSize) + // incRefCount all the columns via `batch` + val batch = withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => + rapidsBuffer.getColumnarBatch(types) + } + val columns = GpuColumnVector.extractBases(batch) + withResource(columns.head) { _ => + columns.head.incRefCount() + withResource(batch) { _ => + assertResult(buffSize)(store.currentSize) + assertResult(0)(store.currentSpillableSize) + } + // still 0 after the batch is closed, because of the extra incRefCount + // for columns.head + assertResult(0)(store.currentSpillableSize) + } + // columns.head is closed, so now our RapidsTable is spillable again + assertResult(buffSize)(store.currentSpillableSize) + } + } + + test("an aliased non-contiguous table is not spillable (until closing the alias) ") { withResource(new RapidsDeviceMemoryStore) { store => val catalog = spy(new RapidsBufferCatalog(store)) val spillPriority = 3 From 0f9e75b474f61d66494f851bb1e96123af08ac6c Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 19 May 2023 10:55:53 -0500 Subject: [PATCH 04/10] Address review comments + make the RapidsTable->Column registration more obvious --- .../spark/rapids/GpuDeviceManager.scala | 6 +- .../nvidia/spark/rapids/RapidsBuffer.scala | 7 +- .../spark/rapids/RapidsBufferCatalog.scala | 8 +- .../spark/rapids/RapidsBufferStore.scala | 13 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 2 + .../rapids/RapidsDeviceMemoryStore.scala | 170 ++++++++++-------- .../nvidia/spark/rapids/RapidsDiskStore.scala | 2 +- .../nvidia/spark/rapids/RapidsGdsStore.scala | 4 +- .../spark/rapids/RapidsHostMemoryStore.scala | 36 ++-- .../rapids/shuffle/BufferSendState.scala | 2 +- .../rapids/RapidsBufferCatalogSuite.scala | 2 +- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 4 +- .../shuffle/RapidsShuffleServerSuite.scala | 8 +- .../rapids/SpillableColumnarBatchSuite.scala | 2 +- 14 files changed, 141 insertions(+), 125 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index 25784678478..6b8938b4083 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -145,7 +145,9 @@ object GpuDeviceManager extends Logging { def shutdown(): Unit = synchronized { // assume error during shutdown until we complete it - chunkedPackMemoryResource.close() + if (chunkedPackMemoryResource != null) { + chunkedPackMemoryResource.close() + } singletonMemoryInitialized = Errored RapidsBufferCatalog.close() GpuShuffleEnv.shutdown() @@ -256,7 +258,7 @@ object GpuDeviceManager extends Logging { val poolSize = conf.chunkedPackPoolSize chunkedPackMemoryResource = new RmmPoolMemoryResource(new RmmCudaMemoryResource(), poolSize, poolSize) - logInfo( + logDebug( s"Initialized pool resource for spill operations " + s"of ${chunkedPackMemoryResource.getMaxSize} Bytes") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index f46455c0479..7004e18d735 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -222,7 +222,7 @@ trait RapidsBuffer extends AutoCloseable { new RapidsBufferCopyIterator(this) /** Descriptor for how the memory buffer is formatted */ - def getMeta: TableMeta + def meta: TableMeta /** The storage tier for this buffer */ val storageTier: StorageTier @@ -322,7 +322,7 @@ trait RapidsBuffer extends AutoCloseable { */ sealed class DegenerateRapidsBuffer( override val id: RapidsBufferId, - val meta: TableMeta) extends RapidsBuffer { + override val meta: TableMeta) extends RapidsBuffer { override def getMemoryUsedBytes: Long = 0L @@ -362,7 +362,4 @@ sealed class DegenerateRapidsBuffer( override def setSpillPriority(priority: Long): Unit = {} override def close(): Unit = {} - - /** Descriptor for how the memory buffer is formatted */ - override def getMeta: TableMeta = meta } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 5848f395cfe..aec7e7ca22e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -323,7 +323,7 @@ class RapidsBufferCatalog( * Adds a batch to the device storage. This does NOT take ownership of the * batch, so it is the responsibility of the caller to close it. * - * @param batch batch that will be owned by the store + * @param batch batch that will be added to the store * @param initialSpillPriority starting spill priority value for the batch * @param needsSync whether the spill framework should stream synchronize while adding * this batch (defaults to true) @@ -341,7 +341,9 @@ class RapidsBufferCatalog( /** * Adds a table to the device storage. * - * This takes ownership of the table. + * This takes ownership of the table. The reason for this is that tables + * don't have a reference count, so we cannot cleanly capture ownership by increasing + * ref count and decreasing from the caller. * * @param table table that will be owned by the store * @param initialSpillPriority starting spill priority value @@ -451,7 +453,7 @@ class RapidsBufferCatalog( if (buffers == null || buffers.isEmpty) { throw new NoSuchElementException(s"Cannot locate buffer associated with ID: $id") } - buffers.head.getMeta + buffers.head.meta } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 19ea2765943..64ae09ae28a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -246,7 +246,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) /** Base class for all buffers in this store. */ abstract class RapidsBufferBase( override val id: RapidsBufferId, - val meta: TableMeta, + _meta: TableMeta, initialSpillPriority: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton) extends RapidsBuffer { @@ -258,11 +258,11 @@ abstract class RapidsBufferStore(val tier: StorageTier) private[this] var spillPriority: Long = initialSpillPriority + def meta: TableMeta = _meta + /** Release the underlying resources for this buffer. */ protected def releaseResources(): Unit - override def getMeta(): TableMeta = meta - /** * Materialize the memory buffer from the underlying storage. * @@ -295,12 +295,11 @@ abstract class RapidsBufferStore(val tier: StorageTier) protected def columnarBatchFromDeviceBuffer(devBuffer: DeviceMemoryBuffer, sparkTypes: Array[DataType]): ColumnarBatch = { - val tableMeta = getMeta() - val bufferMeta = tableMeta.bufferMeta() + val bufferMeta = meta.bufferMeta() if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { - MetaUtils.getBatchFromMeta(devBuffer, tableMeta, sparkTypes) + MetaUtils.getBatchFromMeta(devBuffer, meta, sparkTypes) } else { - GpuCompressedColumnVector.from(devBuffer, tableMeta) + GpuCompressedColumnVector.from(devBuffer, meta) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 8675d7e4b8e..a2933ff4e1b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1859,12 +1859,14 @@ object RapidsConf { val CHUNKED_PACK_POOL_SIZE = conf("spark.rapids.sql.chunkedPack.poolSize") .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + "scratch space, needed during spill from GPU to host memory.") + .internal() .bytesConf(ByteUnit.BYTE) .createWithDefault(10L*1024*1024) val CHUNKED_PACK_BOUNCE_BUFFER_SIZE = conf("spark.rapids.sql.chunkedPack.bounceBufferSize") .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + "bounce buffer, needed during spill from GPU to host memory. ") + .internal() .bytesConf(ByteUnit.BYTE) .checkValue(v => v >= 1L*1024*1024, "The chunked pack bounce buffer must be at least 1MB in size") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index da5dc6f2b64..d28f5a9362e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -32,12 +32,10 @@ import org.apache.spark.sql.vectorized.ColumnarBatch /** * Buffer storage using device memory. * @param chunkedPackBounceBufferSize this is the size of the bounce buffer to be used - * during spill in chunked_pack. The parameter defaults to 1MB, - * since that is the minimum size of this buffer, but - * ideally it should be closer to 128MB for an A100 with the - * rule-of-thumb of 1MB per SM. + * during spill in chunked_pack. The parameter defaults to 128MB, + * with a rule-of-thumb of 1MB per SM. */ -class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) +class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024) extends RapidsBufferStore(StorageTier.DEVICE) { // The RapidsDeviceMemoryStore handles spillability via ref counting @@ -69,7 +67,7 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) new RapidsDeviceMemoryBuffer( other.id, deviceBuffer.getLength, - other.getMeta, + other.meta, deviceBuffer, other.getSpillPriority) } @@ -105,7 +103,7 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) initialSpillPriority) freeOnExcept(rapidsBuffer) { _ => logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + - s"uncompressed=${rapidsBuffer.getMeta().bufferMeta.uncompressedSize}, " + + s"uncompressed=${rapidsBuffer.meta.bufferMeta.uncompressedSize}, " + s"meta_id=${tableMeta.bufferMeta.id}, " + s"meta_size=${tableMeta.bufferMeta.size}]") addBuffer(rapidsBuffer, needsSync) @@ -171,30 +169,53 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) /** * A per cuDF column event handler that handles calls to .close() * inside of the `ColumnVector` lock. - * @param rapidsTable the `RapidsTable` this handler was associated with - * @param columnIx the index of the column that this handler belongs to - * @param repetitionCount the number of times that this column appeared in `RapidsTable` - * @param wrapped an optional RapidsDeviceColumnEventHandler that could be - * not None if this column has been added multiple times to the - * spill store. */ - class RapidsDeviceColumnEventHandler( - val rapidsTable: RapidsTable, - column: ColumnVector, - repetitionCount: Int, - var wrapped: Option[RapidsDeviceColumnEventHandler] = None) + class RapidsDeviceColumnEventHandler extends ColumnVector.EventHandler { - override def onClosed(refCount: Int): Unit = { - // We trigger callbacks iff we reach `refCount` of repetitionCount for this column. - // repetitionCount == 1 for a column that is not repeated in a table, so this means - // we are looking for a refCount of 1, but if the column is aliased several times in the - // table, refCount will be equal to the number of aliases (aka repetitionCount). - // This signals the `RapidsTable` that a column at index `columnIx` has become - // spillable again. - if (refCount == repetitionCount) { - rapidsTable.onColumnSpillable(column) - wrapped.foreach(_.onClosed(refCount)) + // Every RapidsTable that references this column has an entry in this map. + // The value represents the number of times (normally 1) that a ColumnVector + // appears in the RapidsTable. This is also the ColumnVector refCount at which + // the column is considered spillable. + // The map is protected via the ColumnVector lock. + private val registration = new mutable.HashMap[RapidsTable, Int]() + + /** + * Every RapidsTable iterates through its columns and either creates + * a `ColumnTracking` object and associates it with the column's + * `eventHandler` or calls into the existing one, and registers itself. + * + * The registration has two goals: it accounts for repetition of a column + * in a `RapidsTable`. If a table has the same column repeated it must adjust + * the refCount at which this column is considered spillable. + * + * The second goal is to account for aliasing. If two tables alias this column + * we are going to mark it as non spillable. + * + * @param rapidsTable - the table that is registering itself with this tracker + */ + def register(rapidsTable: RapidsTable, repetition: Int): Unit = { + registration.put(rapidsTable, repetition) + } + + /** + * This is invoked during `RapidsTable.free` in order to remove the entry + * in `registration`. + * @param rapidsTable - the table that is de-registering itself + */ + def deregister(rapidsTable: RapidsTable): Unit = { + registration.remove(rapidsTable) + } + + // called with the cudfCv lock held from cuDF's side + override def onClosed(cudfCv: ColumnVector, refCount: Int): Unit = { + // we only handle spillability if there is a single table registered + // (no aliasing) + if (registration.size == 1) { + val (rapidsTable, spillableRefCount) = registration.head + if (spillableRefCount == refCount) { + rapidsTable.onColumnSpillable(cudfCv) + } } } } @@ -240,11 +261,11 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) // By default all columns are NOT spillable since we are not the only owners of // the columns (the caller is holding onto a ColumnarBatch that will be closed // after instantiation, triggering onClosed callbacks) - // This hash set contains the columns that are currently spillable + // This hash set contains the columns that are currently spillable. private val columnSpillability = new ConcurrentHashMap[ColumnVector, Boolean]() - private val distinctColumns = - (0 until table.getNumberOfColumns).map(table.getColumn).distinct.toArray + private val numDistinctColumns = + (0 until table.getNumberOfColumns).map(table.getColumn).distinct.size // we register our event callbacks as the very first action to deal with // spillability @@ -255,7 +276,7 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) table.close() } - override def getMeta(): TableMeta = { + override def meta: TableMeta = { chunkedPacker.getMeta } @@ -268,10 +289,9 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) } /** - * Called from RapidsDeviceColumnEventHandler.onClosed while holding onto the lock - * of a column in our table. + * Mark a column as spillable * - * @param columnIx - index of column to mark spillable + * @param column the ColumnVector to mark as spillable */ def onColumnSpillable(column: ColumnVector): Unit = { columnSpillability.put(column, true) @@ -279,11 +299,17 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) } /** - * This is called after adding this RapidsTable to the spillable store - * in order to update its spillability status. + * Update the spillability state of this RapidsTable. This is invoked from + * two places: + * + * - from the onColumnSpillable callback, which is invoked from a + * ColumnVector.EventHandler.onClosed callback. + * + * - after adding a table to the store to mark the table as spillable if + * all columns are spillable. */ def updateSpillability(): Unit = { - doSetSpillable(this, columnSpillability.size == distinctColumns.size) + doSetSpillable(this, columnSpillability.size == numDistinctColumns) } /** @@ -322,57 +348,49 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 1L*1024*1024) } private def registerOnCloseEventHandler(): Unit = { - val cudfColumns = (0 until table.getNumberOfColumns).map(table.getColumn) + val columns = (0 until table.getNumberOfColumns).map(table.getColumn) // cudfColumns could contain duplicates. We need to take this into account when we are // deciding the floor refCount for a duplicated column val repetitionPerColumn = new mutable.HashMap[ColumnVector, Int]() - cudfColumns.foreach { col => + columns.foreach { col => val repetitionCount = repetitionPerColumn.getOrElse(col, 0) repetitionPerColumn(col) = repetitionCount + 1 } - - distinctColumns.foreach { cv => - cv.synchronized { - val priorEventHandler = cv.getEventHandler.asInstanceOf[RapidsDeviceColumnEventHandler] - val columnEventHandler = - new RapidsDeviceColumnEventHandler( - this, - cv, - repetitionPerColumn(cv), - Option(priorEventHandler)) - cv.setEventHandler(columnEventHandler) - if (cv.getRefCount == repetitionPerColumn(cv)) { - onColumnSpillable(cv) + repetitionPerColumn.foreach { case (distinctCv, repetition) => + // lock the column because we are setting its event handler, and we are inspecting + // its refCount. + distinctCv.synchronized { + val eventHandler = distinctCv.getEventHandler match { + case null => + val eventHandler = new RapidsDeviceColumnEventHandler + distinctCv.setEventHandler(eventHandler) + eventHandler + case existing: RapidsDeviceColumnEventHandler => + existing + case other => + throw new IllegalStateException( + s"Invalid column event handler $other") + } + eventHandler.register(this, repetition) + if (repetition == distinctCv.getRefCount) { + onColumnSpillable(distinctCv) } } } } + // this method is called from free() private def removeOnCloseEventHandler(): Unit = { - distinctColumns.foreach { cv => - cv.synchronized { - cv.getEventHandler match { - case handler: RapidsDeviceColumnEventHandler => - // find the event handler that belongs to this rapidsBuffer - var priorEventHandler = handler - var parent = priorEventHandler - while (priorEventHandler != null && priorEventHandler.rapidsTable != this) { - parent = priorEventHandler - priorEventHandler = priorEventHandler.wrapped.orNull - } - // if our handler is at the head - if (priorEventHandler == handler) { - cv.setEventHandler(priorEventHandler.wrapped.orNull) - } else { - // remove ourselves from the chain - parent.wrapped = if (priorEventHandler != null) { - priorEventHandler.wrapped - } else { - null - } - } + val distinctColumns = + (0 until table.getNumberOfColumns).map(table.getColumn).distinct + distinctColumns.foreach { distinctCv => + distinctCv.synchronized { + distinctCv.getEventHandler match { + case eventHandler: RapidsDeviceColumnEventHandler => + eventHandler.deregister(this) case t => - throw new IllegalStateException(s"Unknown column event handler $t") + throw new IllegalStateException( + s"Invalid column event handler $t") } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index 268b85e3e10..1fcb9b94119 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -64,7 +64,7 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) id, fileOffset, incomingBuffer.getLength, - incoming.getMeta, + incoming.meta, incoming.getSpillPriority) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 9da4a9d7a9b..70160f5b9d0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -136,7 +136,7 @@ class RapidsGdsStore( path, fileOffset, deviceBuffer.getLength, - other.getMeta, + other.meta, other.getSpillPriority) } @@ -178,7 +178,7 @@ class RapidsGdsStore( currentFile, currentOffset, deviceBuffer.getLength, - other.getMeta, + other.meta, other.getSpillPriority) currentOffset += alignUp(deviceBuffer.getLength) pendingBuffers += gdsBuffer diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 1a90562416f..33b10f0164a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange, PinnedMemoryPool} -import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_DIRECT_OFFSET, HOST_MEMORY_BUFFER_PAGEABLE_OFFSET, HOST_MEMORY_BUFFER_PINNED_OFFSET} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -73,14 +73,14 @@ class RapidsHostMemoryStore( withResource(other.getCopyIterator) { otherBufferIterator => val isChunked = otherBufferIterator.isChunked val totalCopySize = otherBufferIterator.getTotalCopySize - val (hostBuffer, allocationMode) = allocateHostBuffer(totalCopySize, false) - withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => - var hostOffset = 0L - val start = System.nanoTime() - while (otherBufferIterator.hasNext) { - val otherBuffer = otherBufferIterator.next() - withResource(otherBuffer) { _ => - try { + val (hostBuffer, allocationMode) = allocateHostBuffer(totalCopySize) + closeOnExcept(hostBuffer) { _ => + withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => + var hostOffset = 0L + val start = System.nanoTime() + while (otherBufferIterator.hasNext) { + val otherBuffer = otherBufferIterator.next() + withResource(otherBuffer) { _ => otherBuffer match { case devBuffer: DeviceMemoryBuffer => hostBuffer.copyFromMemoryBufferAsync( @@ -89,24 +89,20 @@ class RapidsHostMemoryStore( case _ => throw new IllegalStateException("copying from buffer without device memory") } - } catch { - case e: Exception => - hostBuffer.close() - throw e } } + stream.sync() + val end = System.nanoTime() + val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong + val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong + logDebug(s"Spill to host (mode=$allocationMode, chunked=$isChunked) " + + s"size=$szMB MiB bandwidth=$bw MiB/sec") } - stream.sync() - val end = System.nanoTime() - val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong - val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong - logDebug(s"Spill to host (mode=$allocationMode, chunked=$isChunked) " + - s"size=$szMB MiB bandwidth=$bw MiB/sec") } new RapidsHostMemoryBuffer( other.id, totalCopySize, - other.getMeta, + other.meta, applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), hostBuffer, allocationMode) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala index 628f1370e18..c1d65f16980 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala @@ -82,7 +82,7 @@ class BufferSendState( val bufferTransferRequest = transferRequest.requests(btr, ix) withResource(requestHandler.acquireShuffleBuffer( bufferTransferRequest.bufferId())) { table => - bufferMetas(ix) = table.getMeta.bufferMeta() + bufferMetas(ix) = table.meta.bufferMeta() new SendBlock(bufferTransferRequest.bufferId(), table.getPackedSizeBytes) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala index 3112cc89759..2b5e592c144 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala @@ -330,7 +330,7 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { var currentPriority: Long = initialPriority override val id: RapidsBufferId = bufferId override def getMemoryUsedBytes: Long = 0 - override def getMeta: TableMeta = tableMeta + override def meta: TableMeta = tableMeta override val storageTier: StorageTier = tier override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = null override def getMemoryBuffer: MemoryBuffer = null diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index d5904dc194c..7448b6cd3b7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -329,7 +329,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { val resultBuffer = captor.getValue assertResult(bufferId)(resultBuffer.id) assertResult(spillPriority)(resultBuffer.getSpillPriority) - assertResult(meta)(resultBuffer.getMeta) + assertResult(meta)(resultBuffer.meta) } } @@ -471,7 +471,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { b: RapidsBuffer, s: Cuda.Stream): RapidsBufferBase = { spilledBuffers += b.id - new MockRapidsBuffer(b.id, b.getPackedSizeBytes, b.getMeta, b.getSpillPriority) + new MockRapidsBuffer(b.id, b.getPackedSizeBytes, b.meta, b.getSpillPriority) } class MockRapidsBuffer(id: RapidsBufferId, size: Long, meta: TableMeta, spillPriority: Long) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala index ee8848c463d..bad5b845fdd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala @@ -55,7 +55,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { dst.copyFromMemoryBuffer(dstOffset, deviceBuffer, srcOffset, length, stream) } when(mockBuffer.getMemoryUsedBytes).thenReturn(deviceBuffer.getLength) - when(mockBuffer.getMeta).thenReturn(mockMeta) + when(mockBuffer.meta).thenReturn(mockMeta) mockBuffer } } @@ -229,7 +229,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val bb = ByteBuffer.allocateDirect(123) withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) - when(rapidsBuffer.getMeta).thenReturn(tableMeta) + when(rapidsBuffer.meta).thenReturn(tableMeta) when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -286,7 +286,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val bb = ByteBuffer.allocateDirect(123) withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) - when(rapidsBuffer.getMeta).thenReturn(tableMeta) + when(rapidsBuffer.meta).thenReturn(tableMeta) when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -370,7 +370,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { def makeMockBuffer(tableId: Int, bb: ByteBuffer): RapidsBuffer = { val rapidsBuffer = mock[RapidsBuffer] val tableMeta = MetaUtils.buildTableMeta(tableId, 456, bb, 100) - when(rapidsBuffer.getMeta).thenReturn(tableMeta) + when(rapidsBuffer.meta).thenReturn(tableMeta) when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(tableId))) .thenReturn(rapidsBuffer) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index 0998b4c7b7a..1015192f8ed 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -48,7 +48,7 @@ class SpillableColumnarBatchSuite extends FunSuite { class MockBuffer(override val id: RapidsBufferId) extends RapidsBuffer { override def getMemoryUsedBytes: Long = 123 - override def getMeta: TableMeta = null + override def meta: TableMeta = null override val storageTier: StorageTier = StorageTier.DEVICE override def getMemoryBuffer: MemoryBuffer = null override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long, From 71dc17528d0601cdc440a6346f05c8c70e4818ff Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 22 May 2023 09:16:21 -0500 Subject: [PATCH 05/10] Update docs --- docs/configs.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 5362e593166..10d67a64e88 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -41,7 +41,7 @@ Name | Description | Default Value | Applicable at spark.rapids.alluxio.replacement.algo|The algorithm used when replacing the UFS path with the Alluxio path. CONVERT_TIME and TASK_TIME are the valid options. CONVERT_TIME indicates that we do it when we convert it to a GPU file read, this has extra overhead of creating an entirely new file index, which requires listing the files and getting all new file info from Alluxio. TASK_TIME replaces the path as late as possible inside of the task. By waiting and replacing it at task time, it just replaces the path without fetching the file information again, this is faster but doesn't update locality information if that has a bit impact on performance.|TASK_TIME|Runtime spark.rapids.alluxio.slow.disk|Indicates whether the disks used by Alluxio are slow. If it's true and reading S3 large files, Rapids Accelerator reads from S3 directly instead of reading from Alluxio caches. Refer to spark.rapids.alluxio.large.file.threshold which defines a threshold that identifying whether files are large. Typically, it's slow disks if speed is less than 300M/second. If using convert time spark.rapids.alluxio.replacement.algo, this may not apply to all file types like Delta files|true|Runtime spark.rapids.alluxio.user|Alluxio user is set on the Alluxio client, which is used to mount or get information. By default it should be the user that running the Alluxio processes. The default value is ubuntu.|ubuntu|Runtime -spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: abfs, abfss, dbfs, gs, s3, s3a, s3n, wasbs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None|Runtime +spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: abfs, abfss, dbfs, gs, s3, s3a, s3n, wasbs, cosn. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None|Runtime spark.rapids.filecache.checkStale|Controls whether the cached is checked for being out of date with respect to the input file. When enabled, the data that has been cached locally for a file will be invalidated if the file is updated after being cached. This feature is only necessary if an input file for a Spark application can be changed during the lifetime of the application. If an individual input file will not be overwritten during the Spark application then performance may be improved by setting this to false.|false|Startup spark.rapids.filecache.enabled|Controls whether the caching of input files is enabled. When enabled, input datais cached to the same local directories configured for the Spark application. The cache will use up to half the available space by default. To set an absolute cache size limit, see the spark.rapids.filecache.maxBytes configuration setting. Currently only data from Parquet files are cached.|false|Startup spark.rapids.filecache.maxBytes|Controls the maximum amount of data that will be cached locally. If left unspecified, it will use half of the available disk space detected on startup for the configured Spark local disks.|None|Startup @@ -85,8 +85,6 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|true|Runtime spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|true|Runtime spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false|Runtime -spark.rapids.sql.chunkedPack.bounceBufferSize|Amount of GPU memory (in bytes) to set aside at startup for the chunked pack bounce buffer, needed during spill from GPU to host memory. |134217728|Runtime -spark.rapids.sql.chunkedPack.poolSize|Amount of GPU memory (in bytes) to set aside at startup for the chunked pack scratch space, needed during spill from GPU to host memory.|10485760|Runtime spark.rapids.sql.coalescing.reader.numFilterParallel|This controls the number of files the coalescing reader will run in each thread when it filters blocks for reading. If this value is greater than zero the files will be filtered in a multithreaded manner where each thread filters the number of files set by this config. If this is set to zero the files are filtered serially. This uses the same thread pool as the multithreaded reader, see spark.rapids.sql.multiThreadedRead.numThreads. Note that filtering multithreaded is useful with Alluxio.|0|Runtime spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|2|Runtime spark.rapids.sql.concurrentWriterPartitionFlushSize|The flush size of the concurrent writer cache in bytes for each partition. If specified spark.sql.maxConcurrentOutputFileWriters, use concurrent writer to write data. Concurrent writer first caches data for each partition and begins to flush the data if it finds one partition with a size that is greater than or equal to this config. The default value is 0, which will try to select a size based off of file type specific configs. E.g.: It uses `write.parquet.row-group-size-bytes` config for Parquet type and `orc.stripe.size` config for Orc type. If the value is greater than 0, will use this positive value.Max value may get better performance but not always, because concurrent writer uses spillable cache and big value may cause more IO swaps.|0|Runtime From 921e526828ae0962b8c0cf5cde9bada7e338c5b6 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 22 May 2023 10:10:02 -0500 Subject: [PATCH 06/10] Remove pinned memory leak --- .../spark/rapids/GpuDeviceManager.scala | 23 ++++++++----- .../nvidia/spark/rapids/RapidsBuffer.scala | 32 ++++++++++++++++--- .../com/nvidia/spark/rapids/RapidsConf.scala | 6 +++- .../spark/rapids/RapidsHostMemoryStore.scala | 16 +++++----- 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index 7bee2c6d56e..6e3e818e29b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -45,7 +45,7 @@ object GpuDeviceManager extends Logging { // Memory resource used only for cudf::chunked_pack to allocate scratch space // during spill to host. This is done to set aside some memory for this operation // from the beginning of the job. - var chunkedPackMemoryResource: RmmPoolMemoryResource[RmmCudaMemoryResource] = null + var chunkedPackMemoryResource: Option[RmmPoolMemoryResource[RmmCudaMemoryResource]] = None // for testing only def setRmmTaskInitEnabled(enabled: Boolean): Unit = { @@ -145,10 +145,11 @@ object GpuDeviceManager extends Logging { def shutdown(): Unit = synchronized { // assume error during shutdown until we complete it - if (chunkedPackMemoryResource != null) { - chunkedPackMemoryResource.close() - } singletonMemoryInitialized = Errored + + chunkedPackMemoryResource.foreach(_.close) + chunkedPackMemoryResource = None + RapidsBufferCatalog.close() GpuShuffleEnv.shutdown() // try to avoid segfault on RMM shutdown @@ -257,10 +258,16 @@ object GpuDeviceManager extends Logging { val poolSize = conf.chunkedPackPoolSize chunkedPackMemoryResource = - new RmmPoolMemoryResource(new RmmCudaMemoryResource(), poolSize, poolSize) - logDebug( - s"Initialized pool resource for spill operations " + - s"of ${chunkedPackMemoryResource.getMaxSize} Bytes") + if (poolSize > 0) { + val chunkedPackPool = + new RmmPoolMemoryResource(new RmmCudaMemoryResource(), poolSize, poolSize) + logDebug( + s"Initialized pool resource for spill operations " + + s"of ${chunkedPackMemoryResource.map(_.getMaxSize)} Bytes") + Some(chunkedPackPool) + } else { + None + } val info = Cuda.memGetInfo() val poolAllocation = computeRmmPoolSize(conf, info) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 7004e18d735..b53a6e3a6fb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -88,10 +88,30 @@ class ChunkedPacker( private var closed: Boolean = false - private val chunkedPack = - table.makeChunkedPack( - bounceBuffer.getLength, - GpuDeviceManager.chunkedPackMemoryResource) + // When creating cudf::chunked_pack use a pool if available, otherwise default to the + // per-device memory resource + private val chunkedPack = { + val pool = GpuDeviceManager.chunkedPackMemoryResource + val cudfChunkedPack = try { + pool.flatMap { chunkedPool => + Some(table.makeChunkedPack(bounceBuffer.getLength, chunkedPool)) + } + } catch { + case _: OutOfMemoryError => + if (!ChunkedPacker.warnedAboutPoolFallback) { + ChunkedPacker.warnedAboutPoolFallback = true + logWarning( + s"OOM while creating chunked_pack using pool sized ${pool.map(_.getMaxSize)}B. " + + "Falling back to the per-device memory resource.") + } + None + } + + // if the pool is not configured, or we got an OOM, try again with the per-device pool + cudfChunkedPack.getOrElse { + table.makeChunkedPack(bounceBuffer.getLength) + } + } private val tableMeta = withResource(chunkedPack.buildMetadata()) { packedMeta => MetaUtils.buildTableMeta( @@ -131,6 +151,10 @@ class ChunkedPacker( } } +object ChunkedPacker { + private var warnedAboutPoolFallback: Boolean = false +} + /** * This iterator encapsulates a buffer's internal `MemoryBuffer` access * for spill reasons. Internally, there are two known implementations: diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index c96d1c7f667..7e3be49648e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1858,7 +1858,11 @@ object RapidsConf { val CHUNKED_PACK_POOL_SIZE = conf("spark.rapids.sql.chunkedPack.poolSize") .doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " + - "scratch space, needed during spill from GPU to host memory.") + "scratch space, needed during spill from GPU to host memory. As a rule of thumb, each " + + "column should see around 200B that will be allocated from this pool. " + + "With the default of 10MB, a table of ~60,000 columns can be spilled using only this " + + "pool. If this config is 0B, or if allocations fail, the plugin will retry with " + + "the regular GPU memory resource.") .internal() .bytesConf(ByteUnit.BYTE) .createWithDefault(10L*1024*1024) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 33b10f0164a..2a46b3f58b2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -47,7 +47,7 @@ class RapidsHostMemoryStore( preferPinned: Boolean = true): (HostMemoryBuffer, AllocationMode) = { var buffer: HostMemoryBuffer = null if (preferPinned) { - PinnedMemoryPool.tryAllocate(size) + buffer = PinnedMemoryPool.tryAllocate(size) if (buffer != null) { return (buffer, Pinned) } @@ -98,14 +98,14 @@ class RapidsHostMemoryStore( logDebug(s"Spill to host (mode=$allocationMode, chunked=$isChunked) " + s"size=$szMB MiB bandwidth=$bw MiB/sec") } + new RapidsHostMemoryBuffer( + other.id, + totalCopySize, + other.meta, + applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), + hostBuffer, + allocationMode) } - new RapidsHostMemoryBuffer( - other.id, - totalCopySize, - other.meta, - applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), - hostBuffer, - allocationMode) } } From 4f78c5671dd28fd0c975b6f7cb4f14c2d0434943 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 22 May 2023 10:38:34 -0500 Subject: [PATCH 07/10] If chunked_pack fails to initialize make sure we mark it uninitialized --- .../com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index d28f5a9362e..30a2c472101 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -250,8 +250,9 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 private var initializedChunkedPacker: Boolean = false lazy val chunkedPacker: ChunkedPacker = { + val packer = new ChunkedPacker(id, table, chunkedPackBounceBuffer) initializedChunkedPacker = true - new ChunkedPacker(id, table, chunkedPackBounceBuffer) + packer } // This is the current size in batch form. It is to be used while this From 3fd1e286f93295f450f626675f76ccac7091e03e Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 22 May 2023 15:13:16 -0500 Subject: [PATCH 08/10] Fix mocks in RapidsShuffleServerSuite --- .../spark/rapids/shuffle/RapidsShuffleServerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala index bad5b845fdd..3eb73ef0f13 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala @@ -54,7 +54,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val stream = invocation.getArgument[Cuda.Stream](4) dst.copyFromMemoryBuffer(dstOffset, deviceBuffer, srcOffset, length, stream) } - when(mockBuffer.getMemoryUsedBytes).thenReturn(deviceBuffer.getLength) + when(mockBuffer.getPackedSizeBytes).thenReturn(deviceBuffer.getLength) when(mockBuffer.meta).thenReturn(mockMeta) mockBuffer } @@ -230,7 +230,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getPackedSizeBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -287,7 +287,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { withResource(new RefCountedDirectByteBuffer(bb)) { _ => val tableMeta = MetaUtils.buildTableMeta(1, 456, bb, 100) when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getPackedSizeBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(1))) .thenReturn(rapidsBuffer) @@ -371,7 +371,7 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper { val rapidsBuffer = mock[RapidsBuffer] val tableMeta = MetaUtils.buildTableMeta(tableId, 456, bb, 100) when(rapidsBuffer.meta).thenReturn(tableMeta) - when(rapidsBuffer.getMemoryUsedBytes).thenReturn(tableMeta.bufferMeta().size()) + when(rapidsBuffer.getPackedSizeBytes).thenReturn(tableMeta.bufferMeta().size()) when(mockRequestHandler.acquireShuffleBuffer(ArgumentMatchers.eq(tableId))) .thenReturn(rapidsBuffer) rapidsBuffer From a5b008f292a6908339c1185e931f1d4b19ed568f Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 22 May 2023 15:40:42 -0500 Subject: [PATCH 09/10] Fix test issues --- .../rapids/RapidsBufferCatalogSuite.scala | 61 ++++++++++--------- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 12 ++-- 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala index 2b5e592c144..5950617ea48 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala @@ -212,29 +212,39 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { } test("multiple calls to unspill return existing DEVICE buffer") { - val deviceStore = spy(new RapidsDeviceMemoryStore) - val mockStore = mock[RapidsBufferStore] - withResource( - new RapidsHostMemoryStore(10000, 1000)) { hostStore => - deviceStore.setSpillStore(hostStore) - hostStore.setSpillStore(mockStore) - val catalog = new RapidsBufferCatalog(deviceStore) - val handle = withResource(DeviceMemoryBuffer.allocate(1024)) { buff => - val meta = MetaUtils.getTableMetaNoTable(buff) - catalog.addBuffer( - buff, meta, -1) - } - withResource(handle) { _ => - catalog.synchronousSpill(deviceStore, 0) - val acquiredHostBuffer = catalog.acquireBuffer(handle) - val unspilled = withResource(acquiredHostBuffer) { _ => - assertResult(HOST)(acquiredHostBuffer.storageTier) - val unspilled = - catalog.unspillBufferToDeviceStore( + withResource(spy(new RapidsDeviceMemoryStore)) { deviceStore => + val mockStore = mock[RapidsBufferStore] + withResource( + new RapidsHostMemoryStore(10000, 1000)) { hostStore => + deviceStore.setSpillStore(hostStore) + hostStore.setSpillStore(mockStore) + val catalog = new RapidsBufferCatalog(deviceStore) + val handle = withResource(DeviceMemoryBuffer.allocate(1024)) { buff => + val meta = MetaUtils.getTableMetaNoTable(buff) + catalog.addBuffer( + buff, meta, -1) + } + withResource(handle) { _ => + catalog.synchronousSpill(deviceStore, 0) + val acquiredHostBuffer = catalog.acquireBuffer(handle) + val unspilled = withResource(acquiredHostBuffer) { _ => + assertResult(HOST)(acquiredHostBuffer.storageTier) + val unspilled = + catalog.unspillBufferToDeviceStore( + acquiredHostBuffer, + Cuda.DEFAULT_STREAM) + withResource(unspilled) { _ => + assertResult(DEVICE)(unspilled.storageTier) + } + val unspilledSame = catalog.unspillBufferToDeviceStore( acquiredHostBuffer, Cuda.DEFAULT_STREAM) - withResource(unspilled) { _ => - assertResult(DEVICE)(unspilled.storageTier) + withResource(unspilledSame) { _ => + assertResult(unspilled)(unspilledSame) + } + // verify that we invoked the copy function exactly once + verify(deviceStore, times(1)).copyBuffer(any(), any()) + unspilled } val unspilledSame = catalog.unspillBufferToDeviceStore( acquiredHostBuffer, @@ -244,16 +254,7 @@ class RapidsBufferCatalogSuite extends FunSuite with MockitoSugar { } // verify that we invoked the copy function exactly once verify(deviceStore, times(1)).copyBuffer(any(), any()) - unspilled - } - val unspilledSame = catalog.unspillBufferToDeviceStore( - acquiredHostBuffer, - Cuda.DEFAULT_STREAM) - withResource(unspilledSame) { _ => - assertResult(unspilled)(unspilledSame) } - // verify that we invoked the copy function exactly once - verify(deviceStore, times(1)).copyBuffer(any(), any()) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 32eef46bb0b..3b3b3820b07 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -31,14 +31,14 @@ import org.scalatest.FunSuite import org.scalatest.mockito.MockitoSugar import org.apache.spark.sql.rapids.RapidsDiskBlockManager -import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, FloatType, IntegerType, StringType} +import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, StringType} class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { private def buildTable(): Table = { new Table.TestBuilder() .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) - .column(5.0, 2.0, 3.0, 1.0) + .column(5.0D, 2.0D, 3.0D, 1.0D) .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) .build() } @@ -96,7 +96,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { val table = buildTable() val handle = catalog.addTable(table, spillPriority) val types: Array[DataType] = - Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) assertResult(buffSize)(store.currentSize) assertResult(buffSize)(store.currentSpillableSize) @@ -118,7 +118,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { val table = buildTable() val handle = catalog.addTable(table, spillPriority) val types: Array[DataType] = - Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) assertResult(buffSize)(store.currentSize) assertResult(buffSize)(store.currentSpillableSize) @@ -149,7 +149,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { val table = buildTable() val handle = catalog.addTable(table, spillPriority) val types: Array[DataType] = - Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) assertResult(buffSize)(store.currentSize) assertResult(buffSize)(store.currentSpillableSize) @@ -176,7 +176,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with MockitoSugar { val table = buildTable() val handle = catalog.addTable(table, spillPriority) val types: Array[DataType] = - Seq(IntegerType, StringType, FloatType, DecimalType(10, 5)).toArray + Seq(IntegerType, StringType, DoubleType, DecimalType(10, 5)).toArray val buffSize = GpuColumnVector.getTotalDeviceMemoryUsed(table) assertResult(buffSize)(store.currentSize) assertResult(buffSize)(store.currentSpillableSize) From c3c217941b01c00e53ba47f78f616124f3c7fd48 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Mon, 22 May 2023 16:25:49 -0500 Subject: [PATCH 10/10] One more test failure --- .../com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala index 6e8500da1b7..ea9d4b88ccc 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala @@ -204,7 +204,7 @@ class GpuCoalesceBatchesRetrySuite val batches = iter.asInstanceOf[CoalesceIteratorMocks].getBatches() assertResult(10)(batches.length) batches.foreach(b => - verify(b, times(3)).close() + verify(b, times(1)).close() ) } }