-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add in a GpuMemoryLeaseManager #7361
Conversation
Signed-off-by: Robert (Bobby) Evans <[email protected]>
it would be nice to have a description here and in the code as to what it is, how it works/replaces semaphore. |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManager.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManager.scala
Outdated
Show resolved
Hide resolved
* Initializes the GPU memory lease manager | ||
* @param savedForShuffle the amount of memory that is saved for suffle | ||
*/ | ||
def initialize(savedForShuffle: Long): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call it reserved memory or something similarly more generic? We may come up with other situations outside of shuffle that need memory set aside from this. Or are you thinking we will add more parameters to this for each use-case that needs memory reserved in the future?
Along those same lines, I'm also wondering if it's this class's responsibility to calculate the memory amount to manage or the one calling it. The benefit of having the caller specify the amount of memory to manage is that it makes it easier to unit test and reduces the coupling to other things (like shuffle memory reservation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want me to refactor it so we only have a pool size passed in I am fine with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it would be cleaner to move this out. As it is now, the caller could ask for more memory to be reserved than there is memory available on the GPU, for example, which isn't covered. By having the caller calculate the amount of memory for the lease manager to manage, I think we might be able to more cleanly handle these error cases and may be able to remove the need for standard vs. testing init.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManager.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManager.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManager.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManager.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala
Outdated
Show resolved
Hide resolved
assert(lease.leaseAmount == 900) | ||
child.start() | ||
// The child should block so give it some time to come up... | ||
Thread.sleep(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleeps are almost always an indication of a flaky test, especially a sleep that's this small. One full-GC hiccup could easily make this essentially a sleep(0) in practice, which means the test doesn't actually test what we want it to, despite that it passes this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I was being lazy. Thanks for making me fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is still racy, since the child could fail to block on a lease (or still be in the middle of calculating whether or not it should block) when the main thread is checking for the result of getting a lease. Completely eliminating the race probably will require exposing the internals of the lease manager to some degree so the test can verify a specific thread has indeed been blocked by the lease manager (e.g.: add a test-only predicate function to the lease manager that can be used to see if a particular thread is known to be blocked by the manager).
@jlowe @tgravescs I think I have addressed all of the current issues, but I am happy to make more changes if needed. |
Moved this to draft because I want to get some feedback from @abellina |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so one concern I have is that now we potentially override what users had set for spark.rapids.sql.concurrentGpuTasks. I specifically know of one customer using T4's that has this set to 3 and it performed better then 2 and they just had the default batch size. Perhaps we need some override or logic if its explicitly set until we get more logic to be smarter throughout.
It would be interesting to get some NDS runs on smaller gpus to see if it has a performance impact there as well.
* Be aware that the numbers here are just made up, so only take this as an example of what | ||
* could be done, and not exactly what will be done. | ||
* <br/> | ||
* An ORC write can take a number of different forms. We are doing to start with the simplest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* An ORC write can take a number of different forms. We are doing to start with the simplest | |
* An ORC write can take a number of different forms. We are going to start with the simplest |
* the operation - 4 GiB available to use = 5 GiB more needed to complete the operation. | ||
* Because we need more memory to finish the task we can make the input batch is spillable | ||
* and call `requestLease(taskContext, 0, moreNeeded)` where the `moreNeeded` holds 5 GiB, the | ||
* result of the math, and is the `requiredAmount` parameter to `requestLEase`. The method may |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* result of the math, and is the `requiredAmount` parameter to `requestLEase`. The method may | |
* result of the math, and is the `requiredAmount` parameter to `requestLease`. The method may |
* memory to fulfill this request an exception will be thrown. | ||
* @return a MemoryLease that allows the task to allocate more memory. | ||
*/ | ||
def requestLease(tc: TaskContext, optionalAmount: Long, requiredAmount: Long = 0): MemoryLease = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming ideas: use maximumAmount instead of optionalAmount and minimumAmount instead of requiredAmount? that seems clearer than optional/required. and if you specify both maximumAmount and minimumAmount, then maximumAmount would be greater than minimumAmount. seems confusing where existing parameters as independent (e.g. "this is on top of any required amount of memory").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this api suggestion makes it clearer as well and ports well for the non-blocking lease.
} else { | ||
0 | ||
} | ||
GpuMemoryLeaseManager.initialize(reservedShuffleMemory) | ||
val concurrentGpuTasks = conf.concurrentGpuTasks | ||
logInfo(s"The number of concurrent GPU tasks allowed is $concurrentGpuTasks") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, could we include in this line the config for the memory lease manager (or ask it to log its own line). It should include the total amount of memory it sees, and optionally add the reserve amount it discounted.
val alreadyRequested = getTotalLease(tc) | ||
require(requiredAmount + alreadyRequested <= memoryForLease, | ||
s"Task: $taskAttemptId requested at least $requiredAmount more bytes, but already has " + | ||
s"leased $alreadyRequested bytes which would got over the total for the worker " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"leased $alreadyRequested bytes which would got over the total for the worker " + | |
s"leased $alreadyRequested bytes which would go over the total for the executor " + |
} else if (totalAmountRequested + alreadyRequested > memoryForLease) { | ||
logWarning(s"Task: $taskAttemptId requested $totalAmountRequested bytes, but has " + | ||
s"already requested $alreadyRequested bytes. This would go over the total for the " + | ||
s"worker $memoryForLease reducing the request on the hope that this was an " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"worker $memoryForLease reducing the request on the hope that this was an " + | |
s"executor $memoryForLease reducing the request on the hope that this was an " + |
|
||
def getTotalLease(tc: TaskContext): Long = synchronized { | ||
val taskAttemptId = tc.taskAttemptId() | ||
val data = activeTasks.get(taskAttemptId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, instead of data
here and below in releaseLease
we could use task
as we did above.
val taskAttemptId = tc.taskAttemptId() | ||
val data = activeTasks.get(taskAttemptId) | ||
if (data != null) { | ||
// For now we are just going to ignore that it is gone. Could be a race with releaseAllForTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we loose this race we should throw. It seems no code should call releaseLease
if the task is complete already.
tests/src/test/scala/com/nvidia/spark/rapids/GpuMemoryLeaseManagerSuite.scala
Show resolved
Hide resolved
I checked in some code for trying to address issues with users configuring the concurrent number of tasks, but not setting the targetBatchSize when tuning their jobs. It is not great, but I checked it in just in case my desktop dies while I am off for the holidays. |
Closing in favor of a different approach. |
This depends on rapidsai/cudf#12390
This fixes #7253
It is the first step towards better memory coordination in the plugin.