Skip to content

Commit

Permalink
Merge pull request #2425 from NVIDIA/bot-auto-merge-branch-24.10
Browse files Browse the repository at this point in the history
[auto-merge] bot-auto-merge-branch-24.10 to branch-24.12 [skip ci] [bot]
  • Loading branch information
nvauto authored Sep 24, 2024
2 parents 2acf86a + 9392c4f commit 51ef478
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 18 deletions.
55 changes: 46 additions & 9 deletions src/main/cpp/src/SparkResourceAdaptorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <spdlog/sinks/ostream_sink.h>
#include <spdlog/spdlog.h>

#include <algorithm>
#include <chrono>
#include <exception>
#include <map>
Expand Down Expand Up @@ -203,6 +204,8 @@ struct task_metrics {
// The amount of time that this thread has lost due to retries (not including blocked time)
long time_lost_nanos = 0;

long gpu_max_memory_allocated = 0;

void take_from(task_metrics& other)
{
add(other);
Expand All @@ -215,6 +218,8 @@ struct task_metrics {
this->num_times_split_retry_throw += other.num_times_split_retry_throw;
this->time_blocked_nanos += other.time_blocked_nanos;
this->time_lost_nanos += other.time_lost_nanos;
this->gpu_max_memory_allocated =
std::max(this->gpu_max_memory_allocated, other.gpu_max_memory_allocated);
}

void clear()
Expand Down Expand Up @@ -295,6 +300,8 @@ class full_thread_state {
// time)
long time_retry_running_nanos = 0;
std::chrono::time_point<std::chrono::steady_clock> block_start;
long gpu_memory_allocated_bytes = 0;

// metrics for the current thread
task_metrics metrics;

Expand Down Expand Up @@ -799,6 +806,11 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
return get_and_reset_metric(task_id, &task_metrics::time_lost_nanos);
}

long get_and_reset_gpu_max_memory_allocated(long const task_id)
{
return get_and_reset_metric(task_id, &task_metrics::gpu_max_memory_allocated);
}

void check_and_break_deadlocks()
{
std::unique_lock<std::mutex> lock(state_mutex);
Expand All @@ -807,7 +819,6 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {

bool cpu_prealloc(size_t const amount, bool const blocking)
{
// amount is not used yet, but is here in case we want it in the future.
std::unique_lock<std::mutex> lock(state_mutex);
auto const thread_id = static_cast<long>(pthread_self());
return pre_alloc_core(thread_id, true, blocking, lock);
Expand All @@ -820,10 +831,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
{
// addr is not used yet, but is here in case we want it in the future.
// amount is not used yet, but is here in case we want it for debugging/metrics.
// blocking is not used yet. It could be used for some debugging so we are keeping it.
std::unique_lock<std::mutex> lock(state_mutex);
auto const thread_id = static_cast<long>(pthread_self());
post_alloc_success_core(thread_id, true, was_recursive, lock);
post_alloc_success_core(thread_id, true, was_recursive, amount, lock);
}

bool cpu_postalloc_failed(bool const was_oom, bool const blocking, bool const was_recursive)
Expand All @@ -838,7 +848,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
// addr is not used yet, but is here in case we want it in the future.
// amount is not used yet, but is here in case we want it for debugging/metrics.
std::unique_lock<std::mutex> lock(state_mutex);
dealloc_core(true, lock);
dealloc_core(true, lock, amount);
}

/**
Expand Down Expand Up @@ -1333,15 +1343,18 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
* `likely_spill` if this allocation should be treated differently, because
* we detected recursion while handling a prior allocation in this thread.
*/
void post_alloc_success(long const thread_id, bool const likely_spill)
void post_alloc_success(long const thread_id,
bool const likely_spill,
std::size_t const num_bytes)
{
std::unique_lock<std::mutex> lock(state_mutex);
post_alloc_success_core(thread_id, false, likely_spill, lock);
post_alloc_success_core(thread_id, false, likely_spill, num_bytes, lock);
}

void post_alloc_success_core(long const thread_id,
bool const is_for_cpu,
bool const was_recursive,
std::size_t const num_bytes,
std::unique_lock<std::mutex>& lock)
{
// pre allocate checks
Expand All @@ -1360,6 +1373,14 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
}
transition(thread->second, thread_state::THREAD_RUNNING);
thread->second.is_cpu_alloc = false;
// num_bytes is likely not padded, which could cause slight inaccuracies
// but for now it shouldn't matter for watermark purposes
if (!is_for_cpu) {
thread->second.gpu_memory_allocated_bytes += num_bytes;
thread->second.metrics.gpu_max_memory_allocated =
std::max(thread->second.metrics.gpu_max_memory_allocated,
thread->second.gpu_memory_allocated_bytes);
}
break;
default: break;
}
Expand Down Expand Up @@ -1735,7 +1756,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
bool const likely_spill = pre_alloc(tid);
try {
void* ret = resource->allocate(num_bytes, stream);
post_alloc_success(tid, likely_spill);
post_alloc_success(tid, likely_spill, num_bytes);
return ret;
} catch (rmm::out_of_memory const& e) {
// rmm::out_of_memory is what is thrown when an allocation failed
Expand All @@ -1751,7 +1772,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
throw rmm::bad_alloc("Internal Error");
}

void dealloc_core(bool const is_for_cpu, std::unique_lock<std::mutex>& lock)
void dealloc_core(bool const is_for_cpu,
std::unique_lock<std::mutex>& lock,
std::size_t const num_bytes)
{
auto const tid = static_cast<long>(pthread_self());
auto const thread = threads.find(tid);
Expand Down Expand Up @@ -1779,6 +1802,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
if (is_for_cpu == t_state.is_cpu_alloc) {
transition(t_state, thread_state::THREAD_ALLOC_FREE);
}
if (!is_for_cpu) { t_state.gpu_memory_allocated_bytes -= num_bytes; }
break;
default: break;
}
Expand All @@ -1793,7 +1817,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
// deallocate success
if (size > 0) {
std::unique_lock<std::mutex> lock(state_mutex);
dealloc_core(false, lock);
dealloc_core(false, lock, size);
}
}
};
Expand Down Expand Up @@ -2079,6 +2103,19 @@ Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetComputeTimeLost
CATCH_STD(env, 0)
}

JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_getAndResetGpuMaxMemoryAllocated(
JNIEnv* env, jclass, jlong ptr, jlong task_id)
{
JNI_NULL_CHECK(env, ptr, "resource_adaptor is null", 0);
try {
cudf::jni::auto_set_device(env);
auto mr = reinterpret_cast<spark_resource_adaptor*>(ptr);
return mr->get_and_reset_gpu_max_memory_allocated(task_id);
}
CATCH_STD(env, 0)
}

JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_startRetryBlock(
JNIEnv* env, jclass, jlong ptr, jlong thread_id)
{
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/RmmSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,22 @@ public static long getAndResetComputeTimeLostToRetryNs(long taskId) {
}
}

/**
* Get the max device memory footprint, in bytes, that this task had allocated over its lifetime
* @param taskId the id of the task to get the metric for.
* @return the max device memory footprint.
*/
public static long getAndResetGpuMaxMemoryAllocated(long taskId) {
synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
return sra.getAndResetGpuMaxMemoryAllocated(taskId);
} else {
// sra is not set so the value is by definition 0
return 0;
}
}
}

/**
* Called before doing an allocation on the CPU. This could throw an injected exception to help
* with testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public long getAndResetComputeTimeLostToRetry(long taskId) {
return getAndResetComputeTimeLostToRetry(getHandle(), taskId);
}

public long getAndResetGpuMaxMemoryAllocated(long taskId) {
return getAndResetGpuMaxMemoryAllocated(getHandle(), taskId);
}


/**
* Called before doing an allocation on the CPU. This could throw an injected exception to help
Expand Down Expand Up @@ -319,6 +323,7 @@ public void cpuDeallocate(long ptr, long amount) {
private static native int getAndResetSplitRetryThrowInternal(long handle, long taskId);
private static native long getAndResetBlockTimeInternal(long handle, long taskId);
private static native long getAndResetComputeTimeLostToRetry(long handle, long taskId);
private static native long getAndResetGpuMaxMemoryAllocated(long handle, long taskId);
private static native void startRetryBlock(long handle, long threadId);
private static native void endRetryBlock(long handle, long threadId);
private static native void checkAndBreakDeadlocks(long handle);
Expand Down
30 changes: 21 additions & 9 deletions src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import static org.junit.jupiter.api.Assertions.fail;

public class RmmSparkTest {
private final static long ALIGNMENT = 256;

@BeforeEach
public void setup() {
if (Rmm.isInitialized()) {
Expand Down Expand Up @@ -317,6 +319,7 @@ public void testInsertOOMsGpu() {
assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid));
assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid));
assertEquals(0, RmmSpark.getAndResetComputeTimeLostToRetryNs(taskid));
assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid));
RmmSpark.startDedicatedTaskThread(threadId, taskid, t);
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));
try {
Expand All @@ -343,6 +346,7 @@ public void testInsertOOMsGpu() {
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));
assertEquals(1, RmmSpark.getAndResetNumRetryThrow(taskid));
assertEquals(0, RmmSpark.getAndResetNumSplitRetryThrow(taskid));
assertEquals(ALIGNMENT, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid));
RmmSpark.blockThreadUntilReady();

// Allocate something small and verify that it works...
Expand All @@ -356,6 +360,7 @@ public void testInsertOOMsGpu() {
assertThrows(GpuSplitAndRetryOOM.class, () -> Rmm.alloc(100).close());
assertEquals(0, RmmSpark.getAndResetNumRetryThrow(taskid));
assertEquals(1, RmmSpark.getAndResetNumSplitRetryThrow(taskid));
assertEquals(ALIGNMENT * 2, RmmSpark.getAndResetGpuMaxMemoryAllocated(taskid));

// Verify that injecting OOM does not cause the block to actually happen
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));
Expand Down Expand Up @@ -635,8 +640,8 @@ void setupRmmForTestingWithLimits(long maxAllocSize, RmmEventHandler eventHandle
boolean succeeded = false;
try {
resource = new RmmCudaMemoryResource();
resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, 256);
resource = new RmmTrackingResourceAdaptor<>(resource, 256);
resource = new RmmLimitingResourceAdaptor<>(resource, maxAllocSize, ALIGNMENT);
resource = new RmmTrackingResourceAdaptor<>(resource, ALIGNMENT);
Rmm.setCurrentDeviceResource(resource, null, false);
succeeded = true;
} finally {
Expand Down Expand Up @@ -760,9 +765,9 @@ public void testBasicCpuBlocking() throws ExecutionException, InterruptedExcepti

@Test
public void testBasicMixedBlocking() throws ExecutionException, InterruptedException, TimeoutException {
// 10 MiB
setupRmmForTestingWithLimits(10 * 1024 * 1024);
LimitingOffHeapAllocForTests.setLimit(10 * 1024 * 1024);
final long MB = 1024 * 1024;
setupRmmForTestingWithLimits(10 * MB);
LimitingOffHeapAllocForTests.setLimit(10 * MB);
TaskThread taskOne = new TaskThread("TEST THREAD ONE", 1);
TaskThread taskTwo = new TaskThread("TEST THREAD TWO", 2);
TaskThread taskThree = new TaskThread("TEST THREAD THREE", 3);
Expand All @@ -771,6 +776,9 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep
taskTwo.initialize();
taskThree.initialize();
taskFour.initialize();

final long FIVE_MB = 5 * MB;
final long SIX_MB = 6 * MB;
try {
long tOneId = taskOne.getThreadId();
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tOneId));
Expand All @@ -784,18 +792,18 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep
long tFourId = taskFour.getThreadId();
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(tFourId));

try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, 5 * 1024 * 1024)) {
try (AllocOnAnotherThread firstGpuAlloc = new GpuAllocOnAnotherThread(taskOne, FIVE_MB)) {
firstGpuAlloc.waitForAlloc();

try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, 5 * 1024 * 1024)) {
try (AllocOnAnotherThread firstCpuAlloc = new CpuAllocOnAnotherThread(taskTwo, FIVE_MB)) {
firstCpuAlloc.waitForAlloc();

// Blocking GPU Alloc
try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, 6 * 1024 * 1024)) {
try (AllocOnAnotherThread secondGpuAlloc = new GpuAllocOnAnotherThread(taskThree, SIX_MB)) {
taskThree.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS);

// Blocking CPU Alloc
try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, 6 * 1024 * 1024)) {
try (AllocOnAnotherThread secondCpuAlloc = new CpuAllocOnAnotherThread(taskFour, SIX_MB)) {
taskFour.pollForState(RmmSparkThreadState.THREAD_BLOCKED, 1000, TimeUnit.MILLISECONDS);

// We want to make sure that the order of wakeup corresponds to the location of the data that was released
Expand All @@ -814,9 +822,13 @@ public void testBasicMixedBlocking() throws ExecutionException, InterruptedExcep
}
} finally {
taskOne.done();
assertEquals(FIVE_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(1));
taskTwo.done();
assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(2));
taskThree.done();
assertEquals(SIX_MB, RmmSpark.getAndResetGpuMaxMemoryAllocated(3));
taskFour.done();
assertEquals(0, RmmSpark.getAndResetGpuMaxMemoryAllocated(4));
}
}

Expand Down

0 comments on commit 51ef478

Please sign in to comment.