Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use JNI pinned pool resource with cuIO #15255

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,31 @@ private static void freeInternal(long address, long origLength) {
* Initialize the pool.
*
* @param poolSize size of the pool to initialize.
* @note when using this method, the pinned pool will be shared with cuIO
*/
public static synchronized void initialize(long poolSize) {
initialize(poolSize, -1);
initialize(poolSize, -1, true);
}

/**
* Initialize the pool.
*
* @param poolSize size of the pool to initialize.
* @param gpuId gpu id to set to get memory pool from, -1 means to use default
* @note when using this method, the pinned pool will be shared with cuIO
*/
public static synchronized void initialize(long poolSize, int gpuId) {
initialize(poolSize, gpuId, true);
}

/**
* Initialize the pool.
*
* @param poolSize size of the pool to initialize.
* @param gpuId gpu id to set to get memory pool from, -1 means to use default
* @param setCuioHostMemoryResource true if this pinned pool should be used by cuIO for host memory
*/
public static synchronized void initialize(long poolSize, int gpuId, boolean setCuioHostMemoryResource) {
if (isInitialized()) {
throw new IllegalStateException("Can only initialize the pool once.");
}
Expand All @@ -126,7 +139,7 @@ public static synchronized void initialize(long poolSize, int gpuId) {
t.setDaemon(true);
return t;
});
initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize, gpuId));
initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize, gpuId, setCuioHostMemoryResource));
initService.shutdown();
}

Expand Down Expand Up @@ -203,13 +216,16 @@ public static long getTotalPoolSizeBytes() {
return 0;
}

private PinnedMemoryPool(long poolSize, int gpuId) {
private PinnedMemoryPool(long poolSize, int gpuId, boolean setCuioHostMemoryResource) {
if (gpuId > -1) {
// set the gpu device to use
Cuda.setDevice(gpuId);
Cuda.freeZero();
}
this.poolHandle = Rmm.newPinnedPoolMemoryResource(poolSize, poolSize);
if (setCuioHostMemoryResource) {
Rmm.setCuioPinnedPoolMemoryResource(this.poolHandle);
}
this.poolSize = poolSize;
}

Expand Down
8 changes: 8 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Rmm.java
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,17 @@ static native long newEventHandlerResourceAdaptor(long handle, long trackerHandl

public static native long newPinnedPoolMemoryResource(long initSize, long maxSize);

public static native long setCuioPinnedPoolMemoryResource(long poolPtr);

public static native void releasePinnedPoolMemoryResource(long poolPtr);

public static native long allocFromPinnedPool(long poolPtr, long size);

public static native void freeFromPinnedPool(long poolPtr, long ptr, long size);

// only for tests
public static native long allocFromFallbackPinnedPool(long size);

// only for tests
public static native void freeFromFallbackPinnedPool(long ptr, long size);
}
221 changes: 221 additions & 0 deletions java/src/main/native/src/RmmJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <limits>
#include <mutex>

#include <cudf/io/memory_resource.hpp>
#include <rmm/mr/device/aligned_resource_adaptor.hpp>
#include <rmm/mr/device/arena_memory_resource.hpp>
#include <rmm/mr/device/cuda_async_memory_resource.hpp>
Expand Down Expand Up @@ -366,6 +367,187 @@ class java_debug_event_handler_memory_resource final : public java_event_handler
}
};

inline auto &prior_cuio_host_mr() {
static rmm::host_async_resource_ref _prior_cuio_host_mr = cudf::io::get_host_memory_resource();
return _prior_cuio_host_mr;
}

/**
* This is a pinned fallback memory resource that will try to allocate `pool`
* and if that fails, attempt to allocate from the prior resource used by cuIO `prior_cuio_host_mr`.
*
* We detect whether a pointer to free is inside of the pool by checking its address (see
* constructor)
*
* Most of this comes directly from `pinned_host_memory_resource` in RMM.
*/
class pinned_fallback_host_memory_resource {
private:
rmm_pinned_pool_t *_pool;
void *pool_begin_;
void *pool_end_;

public:
pinned_fallback_host_memory_resource(rmm_pinned_pool_t *pool) : _pool(pool) {
// allocate from the pinned pool the full size to figure out
// our beginning and end address.
auto pool_size = pool->pool_size();
pool_begin_ = pool->allocate(pool_size);
pool_end_ = static_cast<void *>(static_cast<uint8_t *>(pool_begin_) + pool_size);
pool->deallocate(pool_begin_, pool_size);
}

// Disable clang-tidy complaining about the easily swappable size and alignment parameters
// of allocate and deallocate
// NOLINTBEGIN(bugprone-easily-swappable-parameters)

/**
* @brief Allocates pinned host memory of size at least \p bytes bytes from either the
* _pool argument provided, or prior_cuio_host_mr.
*
* @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other
* reason.
*
* @param bytes The size, in bytes, of the allocation.
* @param alignment Alignment in bytes. Default alignment is used if unspecified.
*
* @return Pointer to the newly allocated memory.
*/
void *allocate(std::size_t bytes,
[[maybe_unused]] std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) {
try {
return _pool->allocate(bytes, alignment);
} catch (const std::exception &unused) {
// try to allocate using the underlying pinned resource
return prior_cuio_host_mr().allocate(bytes, alignment);
}
// we should not reached here
return nullptr;
}

/**
* @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes. We attempt
* to deallocate from _pool, if ptr is detected to be in the pool address range,
* otherwise we deallocate from `prior_cuio_host_mr`.
*
* @param ptr Pointer to be deallocated.
* @param bytes Size of the allocation.
* @param alignment Alignment in bytes. Default alignment is used if unspecified.
*/
void deallocate(void *ptr, std::size_t bytes,
std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept {
if (ptr >= pool_begin_ && ptr <= pool_end_) {
_pool->deallocate(ptr, bytes, alignment);
} else {
prior_cuio_host_mr().deallocate(ptr, bytes, alignment);
}
}

/**
* @brief Allocates pinned host memory of size at least \p bytes bytes.
*
* @note Stream argument is ignored and behavior is identical to allocate.
*
* @throws rmm::out_of_memory if the requested allocation could not be fulfilled due to to a
* CUDA out of memory error.
* @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other
* error.
*
* @param bytes The size, in bytes, of the allocation.
* @param stream CUDA stream on which to perform the allocation (ignored).
* @return Pointer to the newly allocated memory.
*/
void *allocate_async(std::size_t bytes, [[maybe_unused]] cuda::stream_ref stream) {
return allocate(bytes);
}

/**
* @brief Allocates pinned host memory of size at least \p bytes bytes and alignment \p alignment.
*
* @note Stream argument is ignored and behavior is identical to allocate.
*
* @throws rmm::out_of_memory if the requested allocation could not be fulfilled due to to a
* CUDA out of memory error.
* @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other
* error.
*
* @param bytes The size, in bytes, of the allocation.
* @param alignment Alignment in bytes.
* @param stream CUDA stream on which to perform the allocation (ignored).
* @return Pointer to the newly allocated memory.
*/
void *allocate_async(std::size_t bytes, std::size_t alignment,
[[maybe_unused]] cuda::stream_ref stream) {
return allocate(bytes, alignment);
}

/**
* @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes.
*
* @note Stream argument is ignored and behavior is identical to deallocate.
*
* @param ptr Pointer to be deallocated.
* @param bytes Size of the allocation.
* @param stream CUDA stream on which to perform the deallocation (ignored).
*/
void deallocate_async(void *ptr, std::size_t bytes,
[[maybe_unused]] cuda::stream_ref stream) noexcept {
return deallocate(ptr, bytes);
}

/**
* @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes and alignment \p
* alignment bytes.
*
* @note Stream argument is ignored and behavior is identical to deallocate.
*
* @param ptr Pointer to be deallocated.
* @param bytes Size of the allocation.
* @param alignment Alignment in bytes.
* @param stream CUDA stream on which to perform the deallocation (ignored).
*/
void deallocate_async(void *ptr, std::size_t bytes, std::size_t alignment,
[[maybe_unused]] cuda::stream_ref stream) noexcept {
return deallocate(ptr, bytes, alignment);
}
// NOLINTEND(bugprone-easily-swappable-parameters)

/**
* @briefreturn{true if the specified resource is the same type as this resource.}
*/
bool operator==(const pinned_fallback_host_memory_resource &) const { return true; }

/**
* @briefreturn{true if the specified resource is not the same type as this resource, otherwise
* false.}
*/
bool operator!=(const pinned_fallback_host_memory_resource &) const { return false; }

/**
* @brief Enables the `cuda::mr::device_accessible` property
*
* This property declares that a `pinned_host_memory_resource` provides device accessible memory
*/
friend void get_property(pinned_fallback_host_memory_resource const &,
cuda::mr::device_accessible) noexcept {}

/**
* @brief Enables the `cuda::mr::host_accessible` property
*
* This property declares that a `pinned_host_memory_resource` provides host accessible memory
*/
friend void get_property(pinned_fallback_host_memory_resource const &,
cuda::mr::host_accessible) noexcept {}
};

// carryover from RMM pinned_host_memory_resource
static_assert(
cuda::mr::async_resource_with<pinned_fallback_host_memory_resource, cuda::mr::device_accessible,
cuda::mr::host_accessible>);

// we set this to our fallback resource if we have set it.
std::unique_ptr<pinned_fallback_host_memory_resource> pinned_fallback_mr;

} // anonymous namespace

extern "C" {
Expand Down Expand Up @@ -760,11 +942,30 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Rmm_newPinnedPoolMemoryResource(JNIE
CATCH_STD(env, 0)
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_setCuioPinnedPoolMemoryResource(JNIEnv *env,
jclass clazz,
jlong pool_ptr) {
try {
cudf::jni::auto_set_device(env);
auto pool = reinterpret_cast<rmm_pinned_pool_t *>(pool_ptr);
// create a pinned fallback pool that will allocate pinned memory
// if the regular pinned pool is exhausted
pinned_fallback_mr.reset(new pinned_fallback_host_memory_resource(pool));
// set the cuio host mr and store the prior resource in our static variable
prior_cuio_host_mr() = cudf::io::set_host_memory_resource(*pinned_fallback_mr);
}
CATCH_STD(env, )
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_releasePinnedPoolMemoryResource(JNIEnv *env,
jclass clazz,
jlong pool_ptr) {
try {
cudf::jni::auto_set_device(env);
// set the cuio host memory resource to what it was before, or the same
// if we didn't overwrite it with setCuioPinnedPoolMemoryResource
cudf::io::set_host_memory_resource(prior_cuio_host_mr());
pinned_fallback_mr.reset();
delete reinterpret_cast<rmm_pinned_pool_t *>(pool_ptr);
}
CATCH_STD(env, )
Expand All @@ -791,4 +992,24 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_freeFromPinnedPool(JNIEnv *env, j
}
CATCH_STD(env, )
}

// only for tests
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Rmm_allocFromFallbackPinnedPool(JNIEnv *env,
jclass clazz,
jlong size) {
cudf::jni::auto_set_device(env);
void *ret = cudf::io::get_host_memory_resource().allocate(size);
return reinterpret_cast<jlong>(ret);
}

// only for tests
JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_freeFromFallbackPinnedPool(JNIEnv *env, jclass clazz,
jlong ptr, jlong size) {
try {
cudf::jni::auto_set_device(env);
void *cptr = reinterpret_cast<void *>(ptr);
cudf::io::get_host_memory_resource().deallocate(cptr, size);
}
CATCH_STD(env, )
}
}
20 changes: 20 additions & 0 deletions java/src/test/java/ai/rapids/cudf/PinnedMemoryPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,24 @@ void testZeroSizedAllocation() {
assertEquals(0, buffer.getLength());
}
}

// This test simulates cuIO using our fallback pinned pool wrapper
// we should be able to either go to the pool, in this case 15KB in size
// or we should be falling back to pinned cudaMallocHost/cudaFreeHost.
@Test
void testFallbackPinnedPool() {
final long poolSize = 15 * 1024L;
PinnedMemoryPool.initialize(poolSize);
assertEquals(poolSize, PinnedMemoryPool.getTotalPoolSizeBytes());

long ptr = Rmm.allocFromFallbackPinnedPool(1347); // this doesn't fallback
long ptr2 = Rmm.allocFromFallbackPinnedPool(15 * 1024L); // this does
Rmm.freeFromFallbackPinnedPool(ptr, 1347); // free from pool
Rmm.freeFromFallbackPinnedPool(ptr2, 15*1024); // free from fallback

ptr = Rmm.allocFromFallbackPinnedPool(15*1024L); // this doesn't fallback
ptr2 = Rmm.allocFromFallbackPinnedPool(15*1024L); // this does
Rmm.freeFromFallbackPinnedPool(ptr, 15*1024L); // free from pool
Rmm.freeFromFallbackPinnedPool(ptr2, 15*1024L); // free from fallback
}
}
Loading