diff --git a/include/rmm/mr/device/fallback_resource_adaptor.hpp b/include/rmm/mr/device/fallback_resource_adaptor.hpp new file mode 100644 index 000000000..8083181b5 --- /dev/null +++ b/include/rmm/mr/device/fallback_resource_adaptor.hpp @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +namespace RMM_NAMESPACE { +namespace mr { +/** + * @addtogroup device_resource_adaptors + * @{ + * @file + */ + +/** + * @brief A device memory resource that uses an alternate upstream resource when the primary + * upstream resource throws a specified exception type. + * + * An instance of this resource must be constructed with two upstream resources to satisfy + * allocation requests. + * + * @tparam ExceptionType The type of exception that this adaptor should respond to. + */ +template +class fallback_resource_adaptor final : public device_memory_resource { + public: + using exception_type = ExceptionType; ///< The type of exception this object catches/throws + + /** + * @brief Construct a new `fallback_resource_adaptor` that uses `primary_upstream` + * to satisfy allocation requests and if that fails with `ExceptionType`, uses + * `alternate_upstream`. + * + * @param primary_upstream The primary resource used for allocating/deallocating device memory + * @param alternate_upstream The alternate resource used for allocating/deallocating device memory + * memory + */ + fallback_resource_adaptor(device_async_resource_ref primary_upstream, + device_async_resource_ref alternate_upstream) + : primary_upstream_{primary_upstream}, alternate_upstream_{alternate_upstream} + { + } + + fallback_resource_adaptor() = delete; + ~fallback_resource_adaptor() override = default; + fallback_resource_adaptor(fallback_resource_adaptor const&) = delete; + fallback_resource_adaptor& operator=(fallback_resource_adaptor const&) = delete; + fallback_resource_adaptor(fallback_resource_adaptor&&) noexcept = + default; ///< @default_move_constructor + fallback_resource_adaptor& operator=(fallback_resource_adaptor&&) noexcept = + default; ///< @default_move_assignment{fallback_resource_adaptor} + + /** + * @briefreturn{rmm::device_async_resource_ref to the upstream resource} + */ + [[nodiscard]] rmm::device_async_resource_ref get_upstream_resource() const noexcept + { + return primary_upstream_; + } + + /** + * @briefreturn{rmm::device_async_resource_ref to the alternate upstream resource} + */ + [[nodiscard]] rmm::device_async_resource_ref get_alternate_upstream_resource() const noexcept + { + return alternate_upstream_; + } + + private: + /** + * @brief Allocates memory of size at least `bytes` using the upstream + * resource. + * + * @throws any exceptions thrown from the upstream resources, only `exception_type` + * thrown by the primary upstream is caught. + * + * @param bytes The size, in bytes, of the allocation + * @param stream Stream on which to perform the allocation + * @return void* Pointer to the newly allocated memory + */ + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override + { + void* ret{}; + try { + ret = primary_upstream_.allocate_async(bytes, stream); + } catch (exception_type const& e) { + ret = alternate_upstream_.allocate_async(bytes, stream); + std::lock_guard lock(mtx_); + alternate_allocations_.insert(ret); + } + return ret; + } + + /** + * @brief Free allocation of size `bytes` pointed to by `ptr` + * + * @param ptr Pointer to be deallocated + * @param bytes Size of the allocation + * @param stream Stream on which to perform the deallocation + */ + void do_deallocate(void* ptr, std::size_t bytes, cuda_stream_view stream) override + { + std::size_t count{0}; + { + std::lock_guard lock(mtx_); + count = alternate_allocations_.erase(ptr); + } + if (count > 0) { + alternate_upstream_.deallocate_async(ptr, bytes, stream); + } else { + primary_upstream_.deallocate_async(ptr, bytes, stream); + } + } + + /** + * @brief Compare the resource to another. + * + * @param other The other resource to compare to + * @return true If the two resources are equivalent + * @return false If the two resources are not equal + */ + [[nodiscard]] bool do_is_equal(device_memory_resource const& other) const noexcept override + { + if (this == &other) { return true; } + auto cast = dynamic_cast(&other); + if (cast == nullptr) { return false; } + return get_upstream_resource() == cast->get_upstream_resource() && + get_alternate_upstream_resource() == cast->get_alternate_upstream_resource(); + } + + device_async_resource_ref primary_upstream_; + device_async_resource_ref alternate_upstream_; + std::unordered_set alternate_allocations_; + mutable std::mutex mtx_; +}; + +/** @} */ // end of group +} // namespace mr +} // namespace RMM_NAMESPACE diff --git a/python/rmm/rmm/_lib/memory_resource.pxd b/python/rmm/rmm/_lib/memory_resource.pxd index 000a3fe1e..d9178db11 100644 --- a/python/rmm/rmm/_lib/memory_resource.pxd +++ b/python/rmm/rmm/_lib/memory_resource.pxd @@ -97,6 +97,11 @@ cdef class TrackingResourceAdaptor(UpstreamResourceAdaptor): cdef class FailureCallbackResourceAdaptor(UpstreamResourceAdaptor): cdef object _callback +cdef class FallbackResourceAdaptor(UpstreamResourceAdaptor): + cdef readonly DeviceMemoryResource alternate_upstream_mr + + cpdef DeviceMemoryResource get_alternate_upstream(self) + cdef class PrefetchResourceAdaptor(UpstreamResourceAdaptor): pass diff --git a/python/rmm/rmm/_lib/memory_resource.pyx b/python/rmm/rmm/_lib/memory_resource.pyx index 231253e3f..843331215 100644 --- a/python/rmm/rmm/_lib/memory_resource.pyx +++ b/python/rmm/rmm/_lib/memory_resource.pyx @@ -88,6 +88,10 @@ cdef extern from *: # NOTE: Keep extern declarations in .pyx file as much as possible to avoid # leaking dependencies when importing RMM Cython .pxd files + +cdef extern from "rmm/error.hpp" namespace "rmm" nogil: + cdef cppclass out_of_memory + cdef extern from "rmm/mr/device/cuda_memory_resource.hpp" \ namespace "rmm::mr" nogil: cdef cppclass cuda_memory_resource(device_memory_resource): @@ -129,7 +133,6 @@ cdef extern from "rmm/mr/device/cuda_async_memory_resource.hpp" \ win32 win32_kmt - cdef extern from "rmm/mr/device/pool_memory_resource.hpp" \ namespace "rmm::mr" nogil: cdef cppclass pool_memory_resource[Upstream](device_memory_resource): @@ -233,6 +236,19 @@ cdef extern from "rmm/mr/device/failure_callback_resource_adaptor.hpp" \ void* callback_arg ) except + +cdef extern from "rmm/mr/device/fallback_resource_adaptor.hpp" \ + namespace "rmm::mr" nogil: + cdef cppclass fallback_resource_adaptor[ExceptionType]( + device_memory_resource + ): + # Notice, `fallback_resource_adaptor` takes `device_async_resource_ref` + # as upstream arguments but we define them here as `device_memory_resource*` and + # rely on implicit type conversion. + fallback_resource_adaptor( + device_memory_resource* upstream_mr, + device_memory_resource* alternate_upstream_mr, + ) except + + cdef extern from "rmm/mr/device/prefetch_resource_adaptor.hpp" \ namespace "rmm::mr" nogil: cdef cppclass prefetch_resource_adaptor[Upstream](device_memory_resource): @@ -283,7 +299,6 @@ cdef class UpstreamResourceAdaptor(DeviceMemoryResource): """ def __cinit__(self, DeviceMemoryResource upstream_mr, *args, **kwargs): - if (upstream_mr is None): raise Exception("Argument `upstream_mr` must not be None") @@ -1043,6 +1058,46 @@ cdef class FailureCallbackResourceAdaptor(UpstreamResourceAdaptor): """ pass + +cdef class FallbackResourceAdaptor(UpstreamResourceAdaptor): + + def __cinit__( + self, + DeviceMemoryResource upstream_mr, + DeviceMemoryResource alternate_upstream_mr, + ): + if (alternate_upstream_mr is None): + raise Exception("Argument `alternate_upstream_mr` must not be None") + self.alternate_upstream_mr = alternate_upstream_mr + + self.c_obj.reset( + new fallback_resource_adaptor[out_of_memory]( + upstream_mr.get_mr(), + alternate_upstream_mr.get_mr(), + ) + ) + + def __init__( + self, + DeviceMemoryResource upstream_mr, + DeviceMemoryResource alternate_upstream_mr, + ): + """ + A memory resource that uses an alternate resource when memory allocation fails. + + Parameters + ---------- + upstream : DeviceMemoryResource + The primary resource used for allocating/deallocating device memory + alternate_upstream : DeviceMemoryResource + The alternate resource used when the primary fails to allocate + """ + pass + + cpdef DeviceMemoryResource get_alternate_upstream(self): + return self.alternate_upstream_mr + + cdef class PrefetchResourceAdaptor(UpstreamResourceAdaptor): def __cinit__( diff --git a/python/rmm/rmm/mr.py b/python/rmm/rmm/mr.py index 6eb94da0f..baca7e041 100644 --- a/python/rmm/rmm/mr.py +++ b/python/rmm/rmm/mr.py @@ -18,6 +18,7 @@ CudaMemoryResource, DeviceMemoryResource, FailureCallbackResourceAdaptor, + FallbackResourceAdaptor, FixedSizeMemoryResource, LimitingResourceAdaptor, LoggingResourceAdaptor, @@ -61,6 +62,7 @@ "SystemMemoryResource", "TrackingResourceAdaptor", "FailureCallbackResourceAdaptor", + "FallbackResourceAdaptor", "UpstreamResourceAdaptor", "_flush_logs", "_initialize", diff --git a/python/rmm/rmm/tests/test_rmm.py b/python/rmm/rmm/tests/test_rmm.py index c88d21b38..8c978e6f1 100644 --- a/python/rmm/rmm/tests/test_rmm.py +++ b/python/rmm/rmm/tests/test_rmm.py @@ -805,6 +805,57 @@ def callback(nbytes: int) -> bool: assert retried[0] +def test_fallback_resource_adaptor(): + base = rmm.mr.CudaMemoryResource() + + def alloc_cb(size, stream, *, track: list[int], limit: int): + if size > limit: + raise MemoryError() + ret = base.allocate(size, stream) + track.append(ret) + return ret + + def dealloc_cb(ptr, size, stream, *, track: list[int]): + track.append(ptr) + return base.deallocate(ptr, size, stream) + + main_track = [] + main_mr = rmm.mr.CallbackMemoryResource( + functools.partial(alloc_cb, track=main_track, limit=200), + functools.partial(dealloc_cb, track=main_track), + ) + alternate_track = [] + alternate_mr = rmm.mr.CallbackMemoryResource( + functools.partial(alloc_cb, track=alternate_track, limit=1000), + functools.partial(dealloc_cb, track=alternate_track), + ) + mr = rmm.mr.FallbackResourceAdaptor(main_mr, alternate_mr) + assert main_mr is mr.get_upstream() + assert alternate_mr is mr.get_alternate_upstream() + + # Delete the upstream memory resources here to check that they are + # kept alive by `mr` + del main_mr + del alternate_mr + + # Buffer size within the limit of `main_mr` + rmm.DeviceBuffer(size=100, mr=mr) + # we expect an alloc and a dealloc of the same buffer in + # `main_track` and an empty `alternate_track` + assert len(main_track) == 2 + assert main_track[0] == main_track[1] + assert len(alternate_track) == 0 + + # Buffer size outside the limit of `main_mr` + rmm.DeviceBuffer(size=500, mr=mr) + # we expect an alloc and a dealloc of the same buffer in + # `alternate_track` and an unchanged `main_mr` + assert len(main_track) == 2 + assert main_track[0] == main_track[1] + assert len(alternate_track) == 2 + assert alternate_track[0] == alternate_track[1] + + @pytest.mark.parametrize("managed", [True, False]) def test_prefetch_resource_adaptor(managed): if managed: diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ea1af58cd..551727fe8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -146,9 +146,12 @@ ConfigureTest(STATISTICS_TEST mr/device/statistics_mr_tests.cpp) # tracking adaptor tests ConfigureTest(TRACKING_TEST mr/device/tracking_mr_tests.cpp) -# out-of-memory callback adaptor tests +# failure callback adaptor tests ConfigureTest(FAILURE_CALLBACK_TEST mr/device/failure_callback_mr_tests.cpp) +# failure fallback adaptor tests +ConfigureTest(FAILURE_ALTERNATE_TEST mr/device/fallback_mr_tests.cpp) + # prefetch adaptor tests ConfigureTest(PREFETCH_ADAPTOR_TEST mr/device/prefetch_resource_adaptor_tests.cpp) diff --git a/tests/mr/device/fallback_mr_tests.cpp b/tests/mr/device/fallback_mr_tests.cpp new file mode 100644 index 000000000..0d6aff726 --- /dev/null +++ b/tests/mr/device/fallback_mr_tests.cpp @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../../byte_literals.hpp" + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +namespace rmm::test { +namespace { + +template +struct throw_at_limit_resource final : public mr::device_memory_resource { + throw_at_limit_resource(std::size_t limit) : limit{limit} {} + + void* do_allocate(std::size_t bytes, cuda_stream_view stream) override + { + if (bytes > limit) { throw ExceptionType{"foo"}; } + void* ptr{nullptr}; + RMM_CUDA_TRY_ALLOC(cudaMalloc(&ptr, bytes)); + allocs.insert(ptr); + return ptr; + } + + void do_deallocate(void* ptr, std::size_t bytes, cuda_stream_view stream) override + { + RMM_ASSERT_CUDA_SUCCESS(cudaFree(ptr)); + allocs.erase(ptr); + } + + [[nodiscard]] bool do_is_equal(mr::device_memory_resource const& other) const noexcept override + { + return this == &other; + } + + const std::size_t limit; + std::unordered_set allocs{}; +}; + +TEST(FailureAlternateTest, TrackBothUpstreams) +{ + throw_at_limit_resource primary_mr{100}; + throw_at_limit_resource alternate_mr{1000}; + rmm::mr::fallback_resource_adaptor mr{primary_mr, alternate_mr}; + + // Check that a small allocation goes to the primary resource + { + void* a1 = mr.allocate(10); + EXPECT_EQ(primary_mr.allocs, std::unordered_set{{a1}}); + EXPECT_EQ(alternate_mr.allocs, std::unordered_set{}); + mr.deallocate(a1, 10); + EXPECT_EQ(primary_mr.allocs, std::unordered_set{}); + EXPECT_EQ(alternate_mr.allocs, std::unordered_set{}); + } + + // Check that a large allocation goes to the alternate resource + { + void* a1 = mr.allocate(200); + EXPECT_EQ(primary_mr.allocs, std::unordered_set{}); + EXPECT_EQ(alternate_mr.allocs, std::unordered_set{a1}); + mr.deallocate(a1, 200); + EXPECT_EQ(primary_mr.allocs, std::unordered_set{}); + EXPECT_EQ(alternate_mr.allocs, std::unordered_set{}); + } + + // Check that the exceptions raised by the alternate isn't caught + EXPECT_THROW(mr.allocate(2000), rmm::out_of_memory); +} + +TEST(FailureAlternateTest, DifferentExceptionTypes) +{ + throw_at_limit_resource primary_mr{100}; + throw_at_limit_resource alternate_mr{1000}; + rmm::mr::fallback_resource_adaptor mr{primary_mr, alternate_mr}; + + // Check that only `rmm::out_of_memory` exceptions are caught + EXPECT_THROW(mr.allocate(200), std::invalid_argument); +} + +} // namespace +} // namespace rmm::test