Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spill framework refactor for better performance and extensibility #11747

Draft
wants to merge 6 commits into
base: branch-25.02
Choose a base branch
from

Conversation

abellina
Copy link
Collaborator

This is a very large PR that I'd like some 👀 on. Marked it as draft as I still have some TODOs around more tests. The PR is NOT going to go to 24.12, it's just that we don't have a 25.02 available.

The main file I think one should focus on is SpillFramework.scala (yeap one file, let me know if you want me to break that into multiple files). SpillFramework.scala has a comment describing how things should work, please take a look at that.

The main contribution here is a simplification of the framework where we remove the idea of a RapidsBuffer that has to be acquired and unacquired, for the idea of a handle that just knows how to materialize. There isn't a concept of acquisition in the new framework.

There is a SpillableColumnarBatch api and a lazy-spillable api for Join that I did not touch and left there on purpose, but we can start to remove that API and create spillable handles that replicate the lazy behavior we wanted in lazy spillable, or the recomputing behavior we want for broadcasts. This is the second contribution of the PR: handles decide how to spill, not the framework.

There is one easily fixable shortcoming today in the multiple-spiller case, that I will fix in a follow on PR. While we are spilling a handle, the handle holds a lock. The same lock is used to figure out if the handle is spillable. A second thread that is trying to spill may need to wait for this lock (and spill) to finish, to figure out if it needs to spill that handle or not. We can make this more straightforward by handling the spill state separate from the materialization/data state, but I'd like to submit that work as an improvement.

I have run this against NDS @ 3TB in our perf cluster and I don't see regressions, and have run it against spill prone cases and I am able to see multiple threads in the "spill path", and no deadlocks. I'll post more results when I can run them.

@@ -44,22 +44,20 @@ class RapidsSerializerManager (conf: SparkConf) {

private lazy val compressionCodec: CompressionCodec = TrampolineUtil.createCodec(conf)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to remove this class, or make it much simpler.

val amountSpilled = store.spill(allocSize)
retryState.resetIfNeeded(retryCount, amountSpilled > 0)
logInfo(s"Device allocation of $allocSize bytes failed. " +
s"Device store spilled $amountSpilled bytes. $attemptMsg " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attemptMsg already has the trailing space.

Suggested change
s"Device store spilled $amountSpilled bytes. $attemptMsg " +
s"Device store spilled $amountSpilled bytes. $attemptMsg" +

*
* Every store handle supports a `materialize` method that isn't part of the interface.
* The reason is that to materialize certain objects, you may need some data (for example,
* spark schema descriptors). `materialize` incRefCounts the object if it's resident in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* spark schema descriptors). `materialize` incRefCounts the object if it's resident in the
* Spark schema descriptors). `materialize` incRefCounts the object if it's resident in the

*
* We hold the handle lock when we are spilling (performing IO). That means that no other consumer
* can access this spillable device handle while it is being spilled, including a second thread
* trying to spill and asking each of the handles wether they are spillable or not, as that requires
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* trying to spill and asking each of the handles wether they are spillable or not, as that requires
* trying to spill and asking each of the handles whether they are spillable or not, as that requires

* on the device.
*/
trait DeviceSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
var dev: Option[T]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to invite handle clients to access this directly.

Suggested change
var dev: Option[T]
protected var dev: Option[T]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the access of these variables, they come in really handy in the spill framework suite, without having to mock things or make those tests too complicated. I'd like to try using private[spill] and move this and the suite to a spill package. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spill] works for me.

* on the host.
*/
trait HostSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
var host: Option[T]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var host: Option[T]
protected var host: Option[T]

Comment on lines 205 to 207
builder.copyNext(n, Cuda.DEFAULT_STREAM)
// we are calling chunked packer on `bb` again each time, we need
// to synchronize before we ask for the next chunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to change the method name to something like copyNextAsync so it's clear this is an asynchronous operation. Otherwise it wasn't immediately clear to me why we need a sync after this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a nice catch, I need to make sure this is not async, because it currently asks for a bounce buffer in the disk case. So let me fix this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* obtained during the iterator-like 'next(DeviceMemoryBuffer)'
*/
class ChunkedPacker(table: Table,
bounceBufferSize: Long)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cleaner if we can make ChunkedPacker a true iterator. It feels a little odd that it needs to be told about the size and then it's the callers responsibility to pass the exact same size buffers later to the next method, especially when we have methods where they're handed a ChunkedPacker instance. How are they supposed to know what the correct size is for that instance?

Instead we could create a bounce buffer pool interface that looks something like this:

trait DeviceBounceBufferPool {
  // size of every buffer in the pool
  val bufferSize: Long

  // Returns the next available buffer in the pool,
  // blocking until one is available if necessary.
  // Closing the resulting buffer will release it back to the pool.
  def nextBuffer(): DeviceMemoryBuffer
}

Then ChunkedPacker could be constructed with a device buffer pool (potentially just a single buffer pool in the degenerate case). This now allows it to just be an iterator. next() will return a device buffer containing the next chunk of contents from the pool, and when the caller closes the buffer, it will be released back to the pool for reuse.

}
}

def getHostBuffer: Option[HostMemoryBuffer] = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def getHostBuffer: Option[HostMemoryBuffer] = synchronized {
private def getHostBuffer: Option[HostMemoryBuffer] = synchronized {

Although I'm not sure this method is worth being separate given it's only called in one place and that place blindly assumes the result is not None.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this private[spill] by mistake before, making it just private.

class SpillableColumnarBatchHandle private (
override val sizeInBytes: Long,
override var dev: Option[ColumnarBatch],
var host: Option[SpillableHostBufferHandle] = None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var host: Option[SpillableHostBufferHandle] = None)
private var host: Option[SpillableHostBufferHandle] = None)

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't finish, but things keep changing under me so I thought I would publish what I have so far

true
}

shouldRetry
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this shouldRetry change in there? I assume it was for debugging at some point.

@@ -1110,7 +1110,7 @@ class CudfSpillableHostConcatResult(
val hmb: HostMemoryBuffer) extends SpillableHostConcatResult {

override def toBatch: ColumnarBatch = {
closeOnExcept(buffer.getHostBuffer()) { hostBuf =>
closeOnExcept(buffer.getHostBuffer) { hostBuf =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why change this?

@@ -307,7 +307,6 @@ class LazySpillableColumnarBatchImpl(
spill = Some(SpillableColumnarBatch(cached.get,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
} finally {
// Putting data in a SpillableColumnarBatch takes ownership of it.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this no longer true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debugging something here, and I must have forgotten to undo this change. coming up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but to answer your question, the spill framework takes ownership, always

@abellina
Copy link
Collaborator Author

Sorry for the rapid movement @revans2. I'll pause for a bit, and come back to address the ChunkedPacker comments and other comments I get.

Copy link
Collaborator

@zpuller zpuller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise leaving partial review

0L
} else {
synchronized {
if (disk.isEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the else case, disk not empty, it seems we just do nothing (assume already spilled) but return sizeInBytes - is it possible this leads to double accounting of how much we've spilled?

}
}
disk = Some(diskHandleBuilder.build)
disk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this return value being used anywhere? Can you we just omit this line?

dev.getOrElse {
throw new IllegalStateException(
s"$this is spillable but it doesn't have a dev buffer!")
}.getRefCount == 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is basically identical between device and host. Should we consider making some shared templated implementation to dedup this?

* @tparam T an auto closeable subclass. `host` tracks an instance of this object,
* on the host.
*/
trait HostSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should DeviceSpillableHandle and HostSpillableHandle be consolidated to like MaterializeableSpillableHandle or something? because they can be templated on the buffer type and otherwise look about identical. Made a similar comment below where maybe we can consolidate some repeated code

@abellina abellina changed the base branch from branch-24.12 to branch-25.02 November 25, 2024 19:59
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have a lot more to go through, but I thought I would at least get some of my comments in.

*
* @param blockMap mapping of block ID to array of buffers for the block
*/
private case class ShuffleInfo(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there is a single value in this class. Would it be better to just alias ShuffleInfo to The ConcurrentHashMap...

* Set a new spill priority.
*/
override def setSpillPriority(priority: Long): Unit = {
// TODO: handle.setSpillPriority(priority)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or remove this entirely.

// closing my reference
handle.close()
}
// TODO this is causing problems so we need to look into this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just remove this then? It feels like that should be a separate issue as this is already a large enough PR. (like 10161)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay Sorry it looks like these comments were here already. This is just copied from previous code.

}
// TODO this is causing problems so we need to look into this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again this PR is big enough. I think we can do this in a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay Sorry it looks like these comments were here already. This is just copied from previous code.

@@ -245,18 +338,29 @@ object SpillableColumnarBatch {
*/
def apply(batch: ColumnarBatch,
priority: Long): SpillableColumnarBatch = {
Cuda.DEFAULT_STREAM.sync()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

RapidsBufferCatalog.addBatch(batch, initialSpillPriority)
}
}
Cuda.DEFAULT_STREAM.sync()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again why?

catalog: RapidsBufferCatalog): RapidsBufferHandle = {
withResource(batch) { batch =>
catalog.addBatch(batch, initialSpillPriority)
val handle = SpillableHostColumnarBatchHandle(batch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no stream sync if it is needed for the other APIs?

val handle = withResource(buffer) { _ =>
RapidsBufferCatalog.addBuffer(buffer, meta, priority)
}
Cuda.DEFAULT_STREAM.sync()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still have a lot more to look at, but I am making progress

buffOffset += blockRange.rangeSize()
}
needsCleanup = false
} catch {
case ioe: IOException =>
case ex: Throwable =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to catch Errors?

class ShuffleBufferCatalogSuite extends AnyFunSuite with MockitoSugar {
// TODO: AB: more tests please
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be done?

}, hostCols.head.getRowCount.toInt), dataTypes)
}

// TODO: AB: add tests that span multiple byte buffers for host->disk, and
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be done?

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not done, but I need to switch to some other things so I am going to comment on what I have looked at so far.

*
* CUDA/Host synchronization:
*
* We assume all device backed handles are completely materialized on the device (before adding
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? Please add that to the comments so that it is clear what is happening.

* with extra locking is the `SpillableHostStore`, to maintain a `totalSize` number that is
* used to figure out cheaply when it is full.
*
* Handles hold a lock to protect the user against when it is either in the middle of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing. So essentially the lock prevents race conditions when the object is in the middle of spilling or being closed?

* - If sizeInBytes is 0, the object is tracked by the stores so it can be
* removed on shutdown, or by handle.close, but 0-byte handles are not spillable.
*/
val sizeInBytes: Long
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is approximate we should name it such. So there is no confusion about how that can be used.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this is a val, but if something spills wouldn't it change to 0 from whatever it was before? Shouldn't it be a def?

* or directly against the handle.
* @return sizeInBytes if spilled, 0 for any other reason (not spillable, closed)
*/
def spill: Long
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I thought the convention in Scala was that a method with no parameters should have parens if it did work (like side effects). Should this be def spill(): Long then?

/**
* Method called to spill this handle. It can be triggered from the spill store,
* or directly against the handle.
* @return sizeInBytes if spilled, 0 for any other reason (not spillable, closed)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify do we want them to return the result of calling sizeInBytes? Especially if it is just an approximate size? Can we clarify a little bit here about what is expected to be returned by this API?

* is a `val`.
* @return true if currently spillable, false otherwise
*/
private[spill] def spillable: Boolean = sizeInBytes > 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this private to spill? Just curious, because if we want others to add spillable things in does that mean they have to put the handles in com.nvidia.spark.rapids.spill to make them work?

// do we care if:
// handle didn't fit in the store as it is too large.
// we made memory for this so we are going to hold our noses and keep going
// we could spill `handle` at this point.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we spoke about this already. There are currently two cases.

  1. We are running with no limit on the host memory, but a spill store limit
  2. We have a host memory limit.

If we have a host memory limit, then the spill store is constrained by the host memory limit so we can ignore it here. we will not have allocated a HostMemoryBuffer that is too large to fit.

If we have unlimited host memory, then making a SpillableHostBufferHandle is there for code compatibility, but it should not be added to the host store at all. It should never spill. We have unlimited host memory.

We should also document this somehere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants