Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables spillable/unspillable state for RapidsBuffer and allow buffer sharing #7572

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ae399bf
Enables RapidsBuffer sharing in the spill framework
abellina Jan 16, 2023
1d4bb47
Scalastyle
abellina Jan 24, 2023
a64a3d3
Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer…
abellina Jan 24, 2023
eb94df7
If we keep holding onto the batch we wont spill to disk
abellina Jan 24, 2023
5f9ac7a
Merge branch 'spill/rapids_buffer_handle_dedup_final' of github.com:a…
abellina Jan 24, 2023
3c5c48c
Remove synchronize from DeviceMemoryEventHandler
abellina Jan 25, 2023
99771ac
Move definition of setSpillable to let data members be on top
abellina Jan 25, 2023
8737302
Lock the underlying buffer every time we add, to prevent multiple Rap…
abellina Jan 25, 2023
e6632d0
Add spillableOnAdd so that stores can expose whether they intend buff…
abellina Jan 25, 2023
51673a1
Retry allocation instead of redundant spilling if multiple threads tr…
abellina Jan 25, 2023
a689ed2
Minor whitespace change
abellina Jan 26, 2023
e25f576
Merge branch 'branch-23.02' of https://github.com/NVIDIA/spark-rapids…
abellina Jan 30, 2023
6ccdbca
Merge branch 'branch-23.04' of https://github.com/NVIDIA/spark-rapids…
abellina Jan 30, 2023
a38ea73
Rework locking always taking catalog lock first
abellina Jan 26, 2023
d762557
Fix typo
abellina Jan 31, 2023
c470791
Fix log messages
abellina Jan 31, 2023
139b09d
Code review comments
abellina Feb 3, 2023
f96b644
fix imports and build issue
abellina Feb 3, 2023
7f8521c
Fix RapidsHostMemoryStoreSuite and unspill test
abellina Feb 6, 2023
bbc7c54
Take care of some of the feedback
abellina Feb 6, 2023
bc2a215
Removes waitForPending from the RapidsBufferStore, address a few more…
abellina Feb 6, 2023
a0b039a
getExistingRapidsBufferAndAcquire private and static
abellina Feb 6, 2023
93c0007
Require that callbacks are null when adding buffers to the device store
abellina Feb 7, 2023
1cf8c04
Dont forget to set the spill store
abellina Feb 7, 2023
00aa5ca
Add third argument matcher when mocking synchronousSpill
abellina Feb 7, 2023
6cca74d
In RapidsGdsStoreSuite use the provided catalog
abellina Feb 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,7 @@ class DeviceMemoryEventHandler(

/**
* A small helper class that helps keep track of retry counts as we trigger
* synchronizes on a depleted store.
* synchronizes on a completely spilled store.
*/
class OOMRetryState {
private var synchronizeAttempts = 0
Expand Down Expand Up @@ -82,19 +82,19 @@ class DeviceMemoryEventHandler(
}

/**
* We reset our counters if `storeSize` is non-zero (as we only track when the store is
* depleted), or `retryCount` is less than or equal to what we had previously
* recorded in `shouldTrySynchronizing`. We do this to detect that the new failures
* are for a separate allocation (we need to give this new attempt a new set of
* retries.)
* We reset our counters if `storeSpillableSize` is non-zero (as we only track when all
* spillable buffers are removed from the store), or `retryCount` is less than or equal
* to what we had previously recorded in `shouldTrySynchronizing`.
* We do this to detect that the new failures are for a separate allocation (we need to
* give this new attempt a new set of retries.)
*
* For example, if an allocation fails and we deplete the store, `retryCountLastSynced`
* For example, if an allocation fails and we spill all eligible buffers, `retryCountLastSynced`
* is set to the last `retryCount` sent to us by cuDF as we keep allowing retries
* from cuDF. If we succeed, cuDF resets `retryCount`, and so the new count sent to us
* must be <= than what we saw last, so we can reset our tracking.
*/
def resetIfNeeded(retryCount: Int, storeSize: Long): Unit = {
if (storeSize != 0 || retryCount <= retryCountLastSynced) {
def resetIfNeeded(retryCount: Int, storeSpillableSize: Long): Unit = {
if (storeSpillableSize != 0 || retryCount <= retryCountLastSynced) {
reset()
}
}
Expand All @@ -104,32 +104,37 @@ class DeviceMemoryEventHandler(
* Handles RMM allocation failures by spilling buffers from device memory.
* @param allocSize the byte amount that RMM failed to allocate
* @param retryCount the number of times this allocation has been retried after failure
* @note this handler locks any other threads out that are also OOMing. This is done
* such that that each thread that needs to spill has an accurate count of the
* bytes it must move from the device store to other stores.
* @return true if allocation should be reattempted or false if it should fail
*/
override def onAllocFailure(allocSize: Long, retryCount: Int): Boolean = {
override def onAllocFailure(allocSize: Long, retryCount: Int): Boolean = synchronized {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// check arguments for good measure
require(allocSize >= 0,
require(allocSize >= 0,
s"onAllocFailure invoked with invalid allocSize $allocSize")

require(retryCount >= 0,
require(retryCount >= 0,
s"onAllocFailure invoked with invalid retryCount $retryCount")

try {
withResource(new NvtxRange("onAllocFailure", NvtxColor.RED)) { _ =>
val storeSize = store.currentSize
val storeSpillableSize = store.currentSpillableSize

val attemptMsg = if (retryCount > 0) {
s"Attempt ${retryCount}. "
} else {
"First attempt. "
}

val retryState = oomRetryState.get()
retryState.resetIfNeeded(retryCount, storeSize)
retryState.resetIfNeeded(retryCount, storeSpillableSize)

logInfo(s"Device allocation of $allocSize bytes failed, device store has " +
s"$storeSize bytes. $attemptMsg" +
s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg" +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes. ")
if (storeSize == 0) {
if (storeSpillableSize == 0) {
if (retryState.shouldTrySynchronizing(retryCount)) {
Cuda.deviceSynchronize()
logWarning(s"[RETRY ${retryState.getRetriesSoFar}] " +
Expand All @@ -149,7 +154,7 @@ class DeviceMemoryEventHandler(
false
}
} else {
val targetSize = Math.max(storeSize - allocSize, 0)
val targetSize = Math.max(storeSpillableSize - allocSize, 0)
logDebug(s"Targeting device store size of $targetSize bytes")
val amountSpilled = store.synchronousSpill(targetSize)
logInfo(s"Spilled $amountSpilled bytes from the device store")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import java.util.concurrent.ConcurrentHashMap
import java.util.function.BiFunction

import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, Rmm, Table}
import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, Rmm}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta
Expand Down Expand Up @@ -55,7 +55,7 @@ trait RapidsBufferHandle extends AutoCloseable {
* Catalog for lookup of buffers by ID. The constructor is only visible for testing, generally
* `RapidsBufferCatalog.singleton` should be used instead.
*/
class RapidsBufferCatalog extends AutoCloseable with Arm {
class RapidsBufferCatalog extends AutoCloseable with Arm with Logging {

/** Map of buffer IDs to buffers sorted by storage tier */
private[this] val bufferMap = new ConcurrentHashMap[RapidsBufferId, Seq[RapidsBuffer]]
Expand Down Expand Up @@ -330,6 +330,7 @@ class RapidsBufferCatalog extends AutoCloseable with Arm {
private def removeBuffer(handle: RapidsBufferHandle): Boolean = {
// if this is the last handle, remove the buffer
if (stopTrackingHandle(handle)) {
logDebug(s"Removing buffer ${handle.id}")
val buffers = bufferMap.remove(handle.id)
buffers.safeFree()
true
Expand Down Expand Up @@ -445,25 +446,6 @@ object RapidsBufferCatalog extends Logging with Arm {

def shouldUnspill: Boolean = _shouldUnspill

/**
* Adds a contiguous table to the device storage, taking ownership of the table.
* @param table cudf table based from the contiguous buffer
* @param contigBuffer device memory buffer backing the table
* @param tableMeta metadata describing the buffer layout
* @param initialSpillPriority starting spill priority value for the buffer
* @param spillCallback a callback when the buffer is spilled. This should be very light weight.
* It should never allocate GPU memory and really just be used for metrics.
* @return RapidsBufferHandle associated with this buffer
*/
def addTable(
table: Table,
contigBuffer: DeviceMemoryBuffer,
tableMeta: TableMeta,
initialSpillPriority: Long,
spillCallback: SpillCallback = RapidsBuffer.defaultSpillCallback): RapidsBufferHandle = {
deviceStorage.addTable(table, contigBuffer, tableMeta, initialSpillPriority)
}

/**
* Adds a contiguous table to the device storage. This does NOT take ownership of the
* contiguous table, so it is the responsibility of the caller to close it. The refcount of the
Expand Down
Loading