diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 17f05a9baf6..6cb34683e5a 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -106,9 +106,10 @@ private static void freeInternal(long address, long origLength) { * Initialize the pool. * * @param poolSize size of the pool to initialize. + * @note when using this method, the pinned pool will be shared with cuIO */ public static synchronized void initialize(long poolSize) { - initialize(poolSize, -1); + initialize(poolSize, -1, true); } /** @@ -116,8 +117,20 @@ public static synchronized void initialize(long poolSize) { * * @param poolSize size of the pool to initialize. * @param gpuId gpu id to set to get memory pool from, -1 means to use default + * @note when using this method, the pinned pool will be shared with cuIO */ public static synchronized void initialize(long poolSize, int gpuId) { + initialize(poolSize, gpuId, true); + } + + /** + * Initialize the pool. + * + * @param poolSize size of the pool to initialize. + * @param gpuId gpu id to set to get memory pool from, -1 means to use default + * @param setCuioHostMemoryResource true if this pinned pool should be used by cuIO for host memory + */ + public static synchronized void initialize(long poolSize, int gpuId, boolean setCuioHostMemoryResource) { if (isInitialized()) { throw new IllegalStateException("Can only initialize the pool once."); } @@ -126,7 +139,7 @@ public static synchronized void initialize(long poolSize, int gpuId) { t.setDaemon(true); return t; }); - initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize, gpuId)); + initFuture = initService.submit(() -> new PinnedMemoryPool(poolSize, gpuId, setCuioHostMemoryResource)); initService.shutdown(); } @@ -203,13 +216,16 @@ public static long getTotalPoolSizeBytes() { return 0; } - private PinnedMemoryPool(long poolSize, int gpuId) { + private PinnedMemoryPool(long poolSize, int gpuId, boolean setCuioHostMemoryResource) { if (gpuId > -1) { // set the gpu device to use Cuda.setDevice(gpuId); Cuda.freeZero(); } this.poolHandle = Rmm.newPinnedPoolMemoryResource(poolSize, poolSize); + if (setCuioHostMemoryResource) { + Rmm.setCuioPinnedPoolMemoryResource(this.poolHandle); + } this.poolSize = poolSize; } diff --git a/java/src/main/java/ai/rapids/cudf/Rmm.java b/java/src/main/java/ai/rapids/cudf/Rmm.java index 552da62382a..6e9f90e477f 100755 --- a/java/src/main/java/ai/rapids/cudf/Rmm.java +++ b/java/src/main/java/ai/rapids/cudf/Rmm.java @@ -584,9 +584,17 @@ static native long newEventHandlerResourceAdaptor(long handle, long trackerHandl public static native long newPinnedPoolMemoryResource(long initSize, long maxSize); + public static native long setCuioPinnedPoolMemoryResource(long poolPtr); + public static native void releasePinnedPoolMemoryResource(long poolPtr); public static native long allocFromPinnedPool(long poolPtr, long size); public static native void freeFromPinnedPool(long poolPtr, long ptr, long size); + + // only for tests + public static native long allocFromFallbackPinnedPool(long size); + + // only for tests + public static native void freeFromFallbackPinnedPool(long ptr, long size); } diff --git a/java/src/main/native/src/RmmJni.cpp b/java/src/main/native/src/RmmJni.cpp index 7b81b5ff4de..68af350d5fe 100644 --- a/java/src/main/native/src/RmmJni.cpp +++ b/java/src/main/native/src/RmmJni.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -366,6 +367,187 @@ class java_debug_event_handler_memory_resource final : public java_event_handler } }; +inline auto &prior_cuio_host_mr() { + static rmm::host_async_resource_ref _prior_cuio_host_mr = cudf::io::get_host_memory_resource(); + return _prior_cuio_host_mr; +} + +/** + * This is a pinned fallback memory resource that will try to allocate `pool` + * and if that fails, attempt to allocate from the prior resource used by cuIO `prior_cuio_host_mr`. + * + * We detect whether a pointer to free is inside of the pool by checking its address (see + * constructor) + * + * Most of this comes directly from `pinned_host_memory_resource` in RMM. + */ +class pinned_fallback_host_memory_resource { +private: + rmm_pinned_pool_t *_pool; + void *pool_begin_; + void *pool_end_; + +public: + pinned_fallback_host_memory_resource(rmm_pinned_pool_t *pool) : _pool(pool) { + // allocate from the pinned pool the full size to figure out + // our beginning and end address. + auto pool_size = pool->pool_size(); + pool_begin_ = pool->allocate(pool_size); + pool_end_ = static_cast(static_cast(pool_begin_) + pool_size); + pool->deallocate(pool_begin_, pool_size); + } + + // Disable clang-tidy complaining about the easily swappable size and alignment parameters + // of allocate and deallocate + // NOLINTBEGIN(bugprone-easily-swappable-parameters) + + /** + * @brief Allocates pinned host memory of size at least \p bytes bytes from either the + * _pool argument provided, or prior_cuio_host_mr. + * + * @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other + * reason. + * + * @param bytes The size, in bytes, of the allocation. + * @param alignment Alignment in bytes. Default alignment is used if unspecified. + * + * @return Pointer to the newly allocated memory. + */ + void *allocate(std::size_t bytes, + [[maybe_unused]] std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) { + try { + return _pool->allocate(bytes, alignment); + } catch (const std::exception &unused) { + // try to allocate using the underlying pinned resource + return prior_cuio_host_mr().allocate(bytes, alignment); + } + // we should not reached here + return nullptr; + } + + /** + * @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes. We attempt + * to deallocate from _pool, if ptr is detected to be in the pool address range, + * otherwise we deallocate from `prior_cuio_host_mr`. + * + * @param ptr Pointer to be deallocated. + * @param bytes Size of the allocation. + * @param alignment Alignment in bytes. Default alignment is used if unspecified. + */ + void deallocate(void *ptr, std::size_t bytes, + std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept { + if (ptr >= pool_begin_ && ptr <= pool_end_) { + _pool->deallocate(ptr, bytes, alignment); + } else { + prior_cuio_host_mr().deallocate(ptr, bytes, alignment); + } + } + + /** + * @brief Allocates pinned host memory of size at least \p bytes bytes. + * + * @note Stream argument is ignored and behavior is identical to allocate. + * + * @throws rmm::out_of_memory if the requested allocation could not be fulfilled due to to a + * CUDA out of memory error. + * @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other + * error. + * + * @param bytes The size, in bytes, of the allocation. + * @param stream CUDA stream on which to perform the allocation (ignored). + * @return Pointer to the newly allocated memory. + */ + void *allocate_async(std::size_t bytes, [[maybe_unused]] cuda::stream_ref stream) { + return allocate(bytes); + } + + /** + * @brief Allocates pinned host memory of size at least \p bytes bytes and alignment \p alignment. + * + * @note Stream argument is ignored and behavior is identical to allocate. + * + * @throws rmm::out_of_memory if the requested allocation could not be fulfilled due to to a + * CUDA out of memory error. + * @throws rmm::bad_alloc if the requested allocation could not be fulfilled due to any other + * error. + * + * @param bytes The size, in bytes, of the allocation. + * @param alignment Alignment in bytes. + * @param stream CUDA stream on which to perform the allocation (ignored). + * @return Pointer to the newly allocated memory. + */ + void *allocate_async(std::size_t bytes, std::size_t alignment, + [[maybe_unused]] cuda::stream_ref stream) { + return allocate(bytes, alignment); + } + + /** + * @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes. + * + * @note Stream argument is ignored and behavior is identical to deallocate. + * + * @param ptr Pointer to be deallocated. + * @param bytes Size of the allocation. + * @param stream CUDA stream on which to perform the deallocation (ignored). + */ + void deallocate_async(void *ptr, std::size_t bytes, + [[maybe_unused]] cuda::stream_ref stream) noexcept { + return deallocate(ptr, bytes); + } + + /** + * @brief Deallocate memory pointed to by \p ptr of size \p bytes bytes and alignment \p + * alignment bytes. + * + * @note Stream argument is ignored and behavior is identical to deallocate. + * + * @param ptr Pointer to be deallocated. + * @param bytes Size of the allocation. + * @param alignment Alignment in bytes. + * @param stream CUDA stream on which to perform the deallocation (ignored). + */ + void deallocate_async(void *ptr, std::size_t bytes, std::size_t alignment, + [[maybe_unused]] cuda::stream_ref stream) noexcept { + return deallocate(ptr, bytes, alignment); + } + // NOLINTEND(bugprone-easily-swappable-parameters) + + /** + * @briefreturn{true if the specified resource is the same type as this resource.} + */ + bool operator==(const pinned_fallback_host_memory_resource &) const { return true; } + + /** + * @briefreturn{true if the specified resource is not the same type as this resource, otherwise + * false.} + */ + bool operator!=(const pinned_fallback_host_memory_resource &) const { return false; } + + /** + * @brief Enables the `cuda::mr::device_accessible` property + * + * This property declares that a `pinned_host_memory_resource` provides device accessible memory + */ + friend void get_property(pinned_fallback_host_memory_resource const &, + cuda::mr::device_accessible) noexcept {} + + /** + * @brief Enables the `cuda::mr::host_accessible` property + * + * This property declares that a `pinned_host_memory_resource` provides host accessible memory + */ + friend void get_property(pinned_fallback_host_memory_resource const &, + cuda::mr::host_accessible) noexcept {} +}; + +// carryover from RMM pinned_host_memory_resource +static_assert( + cuda::mr::async_resource_with); + +// we set this to our fallback resource if we have set it. +std::unique_ptr pinned_fallback_mr; + } // anonymous namespace extern "C" { @@ -760,11 +942,30 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Rmm_newPinnedPoolMemoryResource(JNIE CATCH_STD(env, 0) } +JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_setCuioPinnedPoolMemoryResource(JNIEnv *env, + jclass clazz, + jlong pool_ptr) { + try { + cudf::jni::auto_set_device(env); + auto pool = reinterpret_cast(pool_ptr); + // create a pinned fallback pool that will allocate pinned memory + // if the regular pinned pool is exhausted + pinned_fallback_mr.reset(new pinned_fallback_host_memory_resource(pool)); + // set the cuio host mr and store the prior resource in our static variable + prior_cuio_host_mr() = cudf::io::set_host_memory_resource(*pinned_fallback_mr); + } + CATCH_STD(env, ) +} + JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_releasePinnedPoolMemoryResource(JNIEnv *env, jclass clazz, jlong pool_ptr) { try { cudf::jni::auto_set_device(env); + // set the cuio host memory resource to what it was before, or the same + // if we didn't overwrite it with setCuioPinnedPoolMemoryResource + cudf::io::set_host_memory_resource(prior_cuio_host_mr()); + pinned_fallback_mr.reset(); delete reinterpret_cast(pool_ptr); } CATCH_STD(env, ) @@ -791,4 +992,24 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_freeFromPinnedPool(JNIEnv *env, j } CATCH_STD(env, ) } + +// only for tests +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Rmm_allocFromFallbackPinnedPool(JNIEnv *env, + jclass clazz, + jlong size) { + cudf::jni::auto_set_device(env); + void *ret = cudf::io::get_host_memory_resource().allocate(size); + return reinterpret_cast(ret); +} + +// only for tests +JNIEXPORT void JNICALL Java_ai_rapids_cudf_Rmm_freeFromFallbackPinnedPool(JNIEnv *env, jclass clazz, + jlong ptr, jlong size) { + try { + cudf::jni::auto_set_device(env); + void *cptr = reinterpret_cast(ptr); + cudf::io::get_host_memory_resource().deallocate(cptr, size); + } + CATCH_STD(env, ) +} } diff --git a/java/src/test/java/ai/rapids/cudf/PinnedMemoryPoolTest.java b/java/src/test/java/ai/rapids/cudf/PinnedMemoryPoolTest.java index 8c6e29dbd0c..82182adbb70 100644 --- a/java/src/test/java/ai/rapids/cudf/PinnedMemoryPoolTest.java +++ b/java/src/test/java/ai/rapids/cudf/PinnedMemoryPoolTest.java @@ -140,4 +140,24 @@ void testZeroSizedAllocation() { assertEquals(0, buffer.getLength()); } } + + // This test simulates cuIO using our fallback pinned pool wrapper + // we should be able to either go to the pool, in this case 15KB in size + // or we should be falling back to pinned cudaMallocHost/cudaFreeHost. + @Test + void testFallbackPinnedPool() { + final long poolSize = 15 * 1024L; + PinnedMemoryPool.initialize(poolSize); + assertEquals(poolSize, PinnedMemoryPool.getTotalPoolSizeBytes()); + + long ptr = Rmm.allocFromFallbackPinnedPool(1347); // this doesn't fallback + long ptr2 = Rmm.allocFromFallbackPinnedPool(15 * 1024L); // this does + Rmm.freeFromFallbackPinnedPool(ptr, 1347); // free from pool + Rmm.freeFromFallbackPinnedPool(ptr2, 15*1024); // free from fallback + + ptr = Rmm.allocFromFallbackPinnedPool(15*1024L); // this doesn't fallback + ptr2 = Rmm.allocFromFallbackPinnedPool(15*1024L); // this does + Rmm.freeFromFallbackPinnedPool(ptr, 15*1024L); // free from pool + Rmm.freeFromFallbackPinnedPool(ptr2, 15*1024L); // free from fallback + } }