diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index 3864a85cb..897e753f2 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -118,6 +118,7 @@ def run(client: Client, args: Namespace, config: Config): args.rmm_pool_size, args.disable_rmm_pool, args.rmm_log_directory, + args.enable_vmm_pool, ) address_to_index, results, message_data = gather_bench_results(client, args, config) p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 344549806..bdfe3640d 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -69,6 +69,11 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] cluster_args.add_argument( "--disable-rmm-pool", action="store_true", help="Disable the RMM memory pool" ) + cluster_args.add_argument( + "--enable-vmm-pool", + action="store_true", + help="Replace default RMM pool by a VMM memory pool", + ) cluster_args.add_argument( "--rmm-log-directory", default=None, @@ -311,6 +316,7 @@ def setup_memory_pool( pool_size=None, disable_pool=False, log_directory=None, + enable_vmm_pool=False, ): import cupy @@ -320,7 +326,14 @@ def setup_memory_pool( logging = log_directory is not None - if not disable_pool: + if enable_vmm_pool: + print("Enable RMM VMM pool") + from dask_cuda.vmm_pool import rmm_set_current_vmm_pool + + rmm_set_current_vmm_pool() + cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) + elif not disable_pool: + print("Enable RMM default pool") rmm.reinitialize( pool_allocator=True, devices=0, @@ -331,7 +344,9 @@ def setup_memory_pool( cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) -def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): +def setup_memory_pools( + client, is_gpu, pool_size, disable_pool, log_directory, enable_vmm_pool +): if not is_gpu: return client.run( @@ -339,6 +354,7 @@ def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): pool_size=pool_size, disable_pool=disable_pool, log_directory=log_directory, + enable_vmm_pool=enable_vmm_pool, ) # Create an RMM pool on the scheduler due to occasional deserialization # of CUDA objects. May cause issues with InfiniBand otherwise. @@ -347,6 +363,7 @@ def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): pool_size=1e9, disable_pool=disable_pool, log_directory=log_directory, + enable_vmm_pool=enable_vmm_pool, ) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 2a1754a05..8e462fb73 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -42,6 +42,7 @@ from .get_device_memory_objects import DeviceMemoryId, get_device_memory_ids from .proxify_device_objects import proxify_device_objects, unproxify_device_objects from .proxy_object import ProxyObject +from .vmm_pool import rmm_set_current_vmm_pool T = TypeVar("T") @@ -597,6 +598,8 @@ def oom(nbytes: int) -> bool: # Since we didn't find anything to spill, we give up. return False + rmm_set_current_vmm_pool() + current_mr = rmm.mr.get_current_device_resource() mr = rmm.mr.FailureCallbackResourceAdaptor(current_mr, oom) rmm.mr.set_current_device_resource(mr) diff --git a/dask_cuda/rmm_vmm_block_pool.py b/dask_cuda/rmm_vmm_block_pool.py new file mode 100644 index 000000000..18c3ed771 --- /dev/null +++ b/dask_cuda/rmm_vmm_block_pool.py @@ -0,0 +1,225 @@ +from typing import Dict, List, Set, Tuple + +from cuda import cuda, cudart + +from dask_cuda.rmm_vmm_pool import ( + VmmHeap, + check_vmm_gdr_support, + check_vmm_support, + checkCudaErrors, + get_granularity, + to_aligned_size, +) + +CU_VMM_SUPPORTED = ( + cuda.CUdevice_attribute.CU_DEVICE_ATTRIBUTE_VIRTUAL_ADDRESS_MANAGEMENT_SUPPORTED +) +CU_VMM_GDR_SUPPORTED = ( + cuda.CUdevice_attribute.CU_DEVICE_ATTRIBUTE_GPU_DIRECT_RDMA_WITH_CUDA_VMM_SUPPORTED +) +CU_VMM_GRANULARITY = ( + cuda.CUmemAllocationGranularity_flags.CU_MEM_ALLOC_GRANULARITY_MINIMUM +) + + +class VmmBlockPool: + def __init__(self) -> None: + self._block_size: int = 0 + + self._heaps: Dict[cuda.CUdevice, VmmHeap] = {} + + self._mem_handles: Dict[int, cuda.CUmemGenericAllocationHandle] = {} + + self._store_block: Dict[int, int] = {} + self._block_to_mem_handle: Dict[int, int] = {} + self._free_blocks: Set[int] = set() + self._used_blocks: Set[int] = set() + + self._store_user: Dict[int, int] = {} + self._store_user_blocks: Dict[int, List[Tuple[int]]] = {} + + self._allocate_blocks() + + def __del__(self) -> None: + if len(self._store_user) > 0: + print(f"WARN: {len(self._store_user)} user pointers still allocated") + if len(self._used_blocks) > 0: + print(f"WARN: {len(self._used_blocks)} blocks still in use") + + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + + while len(self._store_user) > 0: + ptr, alloc_size = self._store_user.popitem() + checkCudaErrors(cuda.cuMemUnmap(cuda.CUdeviceptr(ptr), alloc_size)) + + while len(self._store_block) > 0: + ptr, alloc_size = self._store_block.popitem() + checkCudaErrors(cuda.cuMemUnmap(cuda.CUdeviceptr(ptr), alloc_size)) + + self._free_blocks.clear() + self._used_blocks.clear() + self._block_to_mem_handle.clear() + + while len(self._mem_handles) > 0: + _, mem_handle = self._mem_handles.popitem() + checkCudaErrors(cuda.cuMemRelease(mem_handle)) + + def get_device(self) -> cuda.CUdevice: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + return checkCudaErrors(cuda.cuCtxGetDevice()) + + def get_heap(self) -> VmmHeap: + device = self.get_device() + + # Notice, `hash(cuda.CUdevice(0)) != hash(cuda.CUdevice(0))` thus the + # explicit convertion to integer. + if int(device) not in self._heaps: + check_vmm_support(device) + self._heaps[int(device)] = VmmHeap(device, get_granularity(device)) + return self._heaps[int(device)] + + def _allocate_blocks(self, block_size: int = 134217728) -> None: + heap = self.get_heap() + block_size = to_aligned_size(block_size, heap.granularity) + self._block_size = block_size + + # Allocate physical memory + allocation_prop = cuda.CUmemAllocationProp() + allocation_prop.type = cuda.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + allocation_prop.location.type = ( + cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + ) + allocation_prop.location.id = heap.device + + # Enable IB/GDRCopy support if available + try: + check_vmm_gdr_support(self.get_device()) + except ValueError: + pass + else: + allocation_prop.allocFlags.gpuDirectRDMACapable = 1 + + # Pre-allocate ~30 GiB + # TODO: Replace by user-input factor based on GPU size. + for i in range(240): + mem_handle = checkCudaErrors( + cuda.cuMemCreate(block_size, allocation_prop, 0) + ) + + # Map physical memory to the heap + block_reserve_ptr = heap.allocate(block_size) + checkCudaErrors( + cuda.cuMemMap( + ptr=block_reserve_ptr, + size=block_size, + offset=0, + handle=mem_handle, + flags=0, + ) + ) + # print(f"block_reserve_ptr: {hex(int(block_reserve_ptr))}") + + # Specify both read and write access. + access_desc = cuda.CUmemAccessDesc() + access_desc.location.type = ( + cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + ) + access_desc.location.id = heap.device + access_desc.flags = ( + cuda.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE + ) + checkCudaErrors( + cuda.cuMemSetAccess( + ptr=block_reserve_ptr, size=block_size, desc=[access_desc], count=1 + ) + ) + + self._mem_handles[int(mem_handle)] = mem_handle + self._block_to_mem_handle[int(block_reserve_ptr)] = int(mem_handle) + self._store_block[int(block_reserve_ptr)] = block_size + self._free_blocks.add(int(block_reserve_ptr)) + + def _take_block(self) -> int: + block = self._free_blocks.pop() + self._used_blocks.add(block) + return block + + def _release_block(self, ptr: int) -> None: + self._used_blocks.remove(ptr) + self._free_blocks.add(ptr) + + def _get_block_mem_handle( + self, block_ptr: int + ) -> cuda.CUmemGenericAllocationHandle: + return self._mem_handles[self._block_to_mem_handle[block_ptr]] + + def allocate(self, size: int) -> int: + heap = self.get_heap() + alloc_size = to_aligned_size(size, heap.granularity) + + # Map physical memory to the heap + reserve_ptr = heap.allocate(alloc_size) + # print(f"user reserve_ptr: {hex(int(reserve_ptr))}") + + used_blocks: List[Tuple[int, int]] = [] + + total_allocated_size = 0 + while total_allocated_size < alloc_size: + offset = total_allocated_size + + block = self._take_block() + block_size = self._store_block[block] + block_size = min(block_size, alloc_size - total_allocated_size) + used_blocks.append((block, block_size)) + mem_handle = self._get_block_mem_handle(block) + + total_allocated_size += block_size + # print(total_allocated_size, alloc_size, block_size) + + checkCudaErrors( + cuda.cuMemMap( + ptr=cuda.CUdeviceptr(int(reserve_ptr) + offset), + size=block_size, + offset=0, + handle=mem_handle, + flags=0, + ) + ) + + # Specify both read and write access. + prop = cuda.CUmemAccessDesc() + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = heap.device + prop.flags = cuda.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE + checkCudaErrors( + cuda.cuMemSetAccess(ptr=reserve_ptr, size=alloc_size, desc=[prop], count=1) + ) + + self._store_user[int(reserve_ptr)] = alloc_size + self._store_user_blocks[int(reserve_ptr)] = used_blocks + # print(f"alloc({int(reserve_ptr)}) - size: {size}, alloc_size: {alloc_size}") + return int(reserve_ptr) + + def get_allocation_blocks(self, ptr: int) -> List[Tuple[int, int]]: + return self._store_user_blocks[ptr] + + def deallocate(self, ptr: int, size: int) -> None: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + alloc_size = self._store_user.pop(ptr) + used_blocks = self._store_user_blocks.pop(ptr) + # print( + # f"deallocating {len(used_blocks)} blocks from user allocation " + # f"{hex(int(ptr))}" + # ) + + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + for block in used_blocks: + block_ptr, block_size = block + # print(f"deallocating: {hex(int(ptr))}, {block_size}") + + checkCudaErrors(cuda.cuMemUnmap(cuda.CUdeviceptr(ptr), alloc_size)) + + self._release_block(block_ptr) + + # print(f"deloc({int(ptr)}) - size: {size}, alloc_size: {alloc_size}") diff --git a/dask_cuda/rmm_vmm_pool.py b/dask_cuda/rmm_vmm_pool.py new file mode 100644 index 000000000..a3e951286 --- /dev/null +++ b/dask_cuda/rmm_vmm_pool.py @@ -0,0 +1,288 @@ +import traceback +from typing import Dict + +from cuda import cuda, cudart, nvrtc + +import rmm.mr + +CU_VMM_SUPPORTED = ( + cuda.CUdevice_attribute.CU_DEVICE_ATTRIBUTE_VIRTUAL_ADDRESS_MANAGEMENT_SUPPORTED +) +CU_VMM_GDR_SUPPORTED = ( + cuda.CUdevice_attribute.CU_DEVICE_ATTRIBUTE_GPU_DIRECT_RDMA_WITH_CUDA_VMM_SUPPORTED +) +CU_VMM_GRANULARITY = ( + cuda.CUmemAllocationGranularity_flags.CU_MEM_ALLOC_GRANULARITY_MINIMUM +) + + +def _cudaGetErrorEnum(error): + if isinstance(error, cuda.CUresult): + err, name = cuda.cuGetErrorName(error) + return name if err == cuda.CUresult.CUDA_SUCCESS else "" + elif isinstance(error, cudart.cudaError_t): + return cudart.cudaGetErrorName(error)[1] + elif isinstance(error, nvrtc.nvrtcResult): + return nvrtc.nvrtcGetErrorString(error)[1] + else: + raise RuntimeError("Unknown error type: {}".format(error)) + + +def checkCudaErrors(result): + if not result[0]: + err = result[0] + msg = "CUDA error code={}({})".format(err.value, _cudaGetErrorEnum(err)) + if err == cuda.CUresult.CUDA_ERROR_OUT_OF_MEMORY: + raise MemoryError(msg) + raise RuntimeError( + msg + "\n" + "\n".join(str(frame) for frame in traceback.extract_stack()) + ) + + if len(result) == 1: + return None + elif len(result) == 2: + return result[1] + else: + return result[1:] + + +def _check_support(dev: cuda.CUdevice, attr: cuda.CUdevice_attribute, msg: str) -> None: + if not checkCudaErrors( + cuda.cuDeviceGetAttribute( + attr, + dev, + ) + ): + raise ValueError(msg) + + +def check_vmm_support(dev: cuda.CUdevice) -> None: + return _check_support( + dev, + CU_VMM_SUPPORTED, + f"Device {dev} doesn't support VIRTUAL ADDRESS MANAGEMENT", + ) + + +def check_vmm_gdr_support(dev: cuda.CUdevice) -> None: + return _check_support( + dev, + CU_VMM_GDR_SUPPORTED, + f"Device {dev} doesn't support GPUDirectRDMA for VIRTUAL ADDRESS MANAGEMENT", + ) + + +def get_granularity(dev: cuda.CUdevice): + prop = cuda.CUmemAllocationProp() + prop.type = cuda.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = dev + return checkCudaErrors(cuda.cuMemGetAllocationGranularity(prop, CU_VMM_GRANULARITY)) + + +def to_aligned_size(size: int, granularity: int): + rest = size % granularity + if rest != 0: + size = size + (granularity - rest) + assert size % granularity == 0 + return size + + +class RegularMemAlloc: + def allocate(self, size: int) -> int: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + device = checkCudaErrors(cuda.cuCtxGetDevice()) + + # Check that the selected device supports virtual address management + check_vmm_support(device) + + alloc_size = to_aligned_size(size, get_granularity(device)) + return int(checkCudaErrors(cuda.cuMemAlloc(alloc_size))) + + def deallocate(self, ptr: int, size: int) -> None: + checkCudaErrors(cuda.cuMemFree(ptr)) + + +class VmmAlloc: + def __init__(self) -> None: + self._store = {} + + def allocate(self, size: int) -> int: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + device = checkCudaErrors(cuda.cuCtxGetDevice()) + + # Check that the selected device supports virtual address management + check_vmm_support(device) + + alloc_size = to_aligned_size(size, get_granularity(device)) + + prop = cuda.CUmemAllocationProp() + prop.type = cuda.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = device + + # Enable IB/GDRCopy support if available + try: + check_vmm_gdr_support(device) + except ValueError: + pass + else: + prop.allocFlags.gpuDirectRDMACapable = 1 + + mem_handle = checkCudaErrors(cuda.cuMemCreate(alloc_size, prop, 0)) + + reserve_ptr = checkCudaErrors( + cuda.cuMemAddressReserve( + size=alloc_size, alignment=0, addr=cuda.CUdeviceptr(0), flags=0 + ) + ) + + checkCudaErrors( + cuda.cuMemMap( + ptr=reserve_ptr, size=alloc_size, offset=0, handle=mem_handle, flags=0 + ) + ) + + # Since we do not need to make any other mappings of this memory or export it, + # we no longer need and can release the mem_alloc. + # The allocation will be kept live until it is unmapped. + checkCudaErrors(cuda.cuMemRelease(mem_handle)) + + # Specify both read and write access. + prop = cuda.CUmemAccessDesc() + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = device + prop.flags = cuda.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE + checkCudaErrors( + cuda.cuMemSetAccess(ptr=reserve_ptr, size=alloc_size, desc=[prop], count=1) + ) + + self._store[int(reserve_ptr)] = alloc_size + return int(reserve_ptr) + + def deallocate(self, ptr: int, size: int) -> None: + alloc_size = self._store.pop(ptr) + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + checkCudaErrors(cuda.cuMemUnmap(cuda.CUdeviceptr(ptr), alloc_size)) + checkCudaErrors(cuda.cuMemAddressFree(cuda.CUdeviceptr(ptr), alloc_size)) + + +class VmmHeap: + def __init__( + self, device: cuda.CUdevice, granularity: int, size: int = 2**40 + ) -> None: + self.granularity = granularity + self.device = device + self._offset = 0 + self._size = size + + # Make a virtual memory reservation. + result = cuda.cuMemAddressReserve( + size=size, alignment=0, addr=cuda.CUdeviceptr(0), flags=0 + ) + # We check the result manually to avoid raise MemoryError, which would + # trigger out-of-memory handling. + if result[0] == cuda.CUresult.CUDA_ERROR_OUT_OF_MEMORY: + raise RuntimeError("cuda.cuMemAddressReserve() - CUDA_ERROR_OUT_OF_MEMORY") + self._heap = int(checkCudaErrors(result)) + + def allocate(self, size: int) -> cuda.CUdeviceptr: + assert size % self.granularity == 0 + ret = self._heap + self._offset + self._offset += to_aligned_size(size, self.granularity) + assert self._offset <= self._size + assert ret % self.granularity == 0 + return cuda.CUdeviceptr(ret) + + +class VmmAllocPool: + def __init__(self) -> None: + self._store: Dict[int, int] = {} + self._heaps: Dict[cuda.CUdevice, VmmHeap] = {} + + def get_device(self) -> cuda.CUdevice: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + return checkCudaErrors(cuda.cuCtxGetDevice()) + + def get_heap(self) -> VmmHeap: + device = self.get_device() + + # Notice, `hash(cuda.CUdevice(0)) != hash(cuda.CUdevice(0))` thus the + # explicit convertion to integer. + if int(device) not in self._heaps: + check_vmm_support(device) + self._heaps[int(device)] = VmmHeap(device, get_granularity(device)) + return self._heaps[int(device)] + + def allocate(self, size: int) -> int: + heap = self.get_heap() + alloc_size = to_aligned_size(size, heap.granularity) + + # Allocate physical memory + prop = cuda.CUmemAllocationProp() + prop.type = cuda.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = heap.device + + # Enable IB/GDRCopy support if available + try: + check_vmm_gdr_support(self.get_device()) + except ValueError: + pass + else: + prop.allocFlags.gpuDirectRDMACapable = 1 + + mem_handle = checkCudaErrors(cuda.cuMemCreate(alloc_size, prop, 0)) + + # Map physical memory to the heap + reserve_ptr = heap.allocate(alloc_size) + checkCudaErrors( + cuda.cuMemMap( + ptr=reserve_ptr, + size=alloc_size, + offset=0, + handle=mem_handle, + flags=0, + ) + ) + + # Since we do not need to make any other mappings of this memory or export it, + # we no longer need and can release the mem_handle. + # The allocation will be kept live until it is unmapped. + checkCudaErrors(cuda.cuMemRelease(mem_handle)) + + # Specify both read and write access. + prop = cuda.CUmemAccessDesc() + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = heap.device + prop.flags = cuda.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE + checkCudaErrors( + cuda.cuMemSetAccess(ptr=reserve_ptr, size=alloc_size, desc=[prop], count=1) + ) + + self._store[int(reserve_ptr)] = alloc_size + # print(f"alloc({int(reserve_ptr)}) - size: {size}, alloc_size: {alloc_size}") + return int(reserve_ptr) + + def deallocate(self, ptr: int, size: int) -> None: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + alloc_size = self._store.pop(ptr) + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + checkCudaErrors(cuda.cuMemUnmap(cuda.CUdeviceptr(ptr), alloc_size)) + # print(f"deloc({int(ptr)}) - size: {size}, alloc_size: {alloc_size}") + + +def set_vmm_pool(): + # store = RegularMemAlloc() + # store = VmmAlloc() + store = VmmAllocPool() + + def allocate(size: int): + return store.allocate(size) + + def deallocate(ptr: int, size: int): + store.deallocate(ptr, size) + + rmm.mr.set_current_device_resource( + rmm.mr.CallbackMemoryResource(allocate, deallocate) + ) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 1edcab09d..41f9cae69 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -1,3 +1,4 @@ +import gc import re import tempfile from typing import Iterable @@ -21,6 +22,7 @@ from dask_cuda.proxify_host_file import ProxifyHostFile from dask_cuda.proxy_object import ProxyObject, asproxy, unproxy from dask_cuda.utils import get_device_total_memory +from dask_cuda.vmm_pool import rmm_get_current_vmm_pool cupy = pytest.importorskip("cupy") cupy.cuda.set_allocator(None) @@ -219,10 +221,13 @@ def test_spill_on_demand(): and allocating two large buffers that will otherwise fail because of spilling on demand. """ + print() rmm = pytest.importorskip("rmm") if not hasattr(rmm.mr, "FailureCallbackResourceAdaptor"): pytest.skip("RMM doesn't implement FailureCallbackResourceAdaptor") + rmm.reinitialize() + total_mem = get_device_total_memory() dhf = ProxifyHostFile( local_directory=root_dir, @@ -230,13 +235,22 @@ def test_spill_on_demand(): memory_limit=2 * total_mem, spill_on_demand=True, ) - for i in range(2): - dhf[i] = rmm.DeviceBuffer(size=total_mem // 2 + 1) + dhf.initialize_spill_on_demand_once() + rmm_get_current_vmm_pool() + dhf["a"] = rmm.DeviceBuffer(size=total_mem // 2 + 1) + dhf["b"] = rmm.DeviceBuffer(size=total_mem // 2 + 1) + del dhf + gc.collect() + vmm_pool = rmm_get_current_vmm_pool() + print("vmm_pool", vmm_pool._allocs) @pytest.mark.parametrize("jit_unspill", [True, False]) def test_local_cuda_cluster(jit_unspill): """Testing spilling of a proxied cudf dataframe in a local cuda cluster""" + vmm_pool = rmm_get_current_vmm_pool() + vmm_pool.clear() + cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") @@ -273,7 +287,7 @@ def test_dataframes_share_dev_mem(): # Even though the two dataframe doesn't point to the same cudf.Buffer object assert view1["a"].data is not view2["a"].data # They still share the same underlying device memory - assert view1["a"].data._owner._owner is view2["a"].data._owner._owner + assert view1["a"].data._owner.owner is view2["a"].data._owner.owner dhf = ProxifyHostFile( local_directory=root_dir, device_memory_limit=160, memory_limit=1000 @@ -453,9 +467,7 @@ def task(): client.submit(range, 10).result() # Submit too large RMM buffer - with pytest.raises( - MemoryError, match=r".*std::bad_alloc:.*CUDA error at:.*" - ): + with pytest.raises(MemoryError, match=r"std::bad_alloc:.*CUDA"): client.submit(task).result() log = str(client.get_worker_logs()) diff --git a/dask_cuda/tests/test_vmm_block_pool.py b/dask_cuda/tests/test_vmm_block_pool.py new file mode 100644 index 000000000..c89c8e463 --- /dev/null +++ b/dask_cuda/tests/test_vmm_block_pool.py @@ -0,0 +1,100 @@ +import numpy as np +import pytest +from cuda import cuda + +from dask_cuda.rmm_vmm_block_pool import VmmBlockPool, checkCudaErrors + + +@pytest.mark.parametrize( + "size", [1, 1000, 100 * 1024**2, 100 * 1024**2 + 1, int(200e6)] +) +def test_allocate(size): + pool = VmmBlockPool() + + buf = pool.allocate(size) + assert isinstance(buf, int) + assert buf > 0 + pool.deallocate(buf, size) + + +def test_del_warn(capsys): + pool = VmmBlockPool() + + pool.allocate(int(200e6)) + + del pool + + captured = capsys.readouterr() + assert "WARN: 1 user pointers still allocated" in captured.out + # TODO: match regex + assert "blocks still in use" in captured.out + + +@pytest.mark.parametrize( + "size", [1, 1000, 100 * 1024**2, 100 * 1024**2 + 1, int(200e6)] +) +def test_mapping_content(size): + pool = VmmBlockPool() + + d_buf = pool.allocate(size) + + h_buf_in = np.arange(size, dtype="u1") + checkCudaErrors( + cuda.cuMemcpyHtoDAsync( + cuda.CUdeviceptr(d_buf), h_buf_in.ctypes.data, size, cuda.CUstream(0) + ) + ) + + h_buf_out = np.empty(size, dtype="u1") + checkCudaErrors( + cuda.cuMemcpyDtoHAsync( + h_buf_out.ctypes.data, cuda.CUdeviceptr(d_buf), size, cuda.CUstream(0) + ) + ) + + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + pool.deallocate(d_buf, size) + + np.testing.assert_equal(h_buf_out, h_buf_in) + + +@pytest.mark.parametrize( + "size", [1, 1000, 128 * 1024**2, 128 * 1024**2 + 1, int(200e6)] +) +def test_block_content(size): + pool = VmmBlockPool() + + d_buf = pool.allocate(size) + blocks = pool.get_allocation_blocks(d_buf) + + h_buf_in = np.arange(size, dtype="u1") + offset = 0 + block_num = 0 + while offset < size: + ptr, block_size = blocks[block_num] + block_size = min(block_size, size - offset) + checkCudaErrors( + cuda.cuMemcpyHtoDAsync( + cuda.CUdeviceptr(ptr), + h_buf_in.ctypes.data + offset, + block_size, + cuda.CUstream(0), + ) + ) + + h_buf_out = np.empty(block_size, dtype="u1") + checkCudaErrors( + cuda.cuMemcpyDtoHAsync( + h_buf_out.ctypes.data, + cuda.CUdeviceptr(ptr), + block_size, + cuda.CUstream(0), + ) + ) + + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + np.testing.assert_equal(h_buf_out, h_buf_in[offset : offset + block_size]) + + offset += block_size + + pool.deallocate(d_buf, size) diff --git a/dask_cuda/tests/test_vmm_pool.py b/dask_cuda/tests/test_vmm_pool.py new file mode 100644 index 000000000..f28d847be --- /dev/null +++ b/dask_cuda/tests/test_vmm_pool.py @@ -0,0 +1,87 @@ +import numpy as np +import pytest +from cuda import cuda + +from dask_cuda.vmm_pool import VmmPool, checkCudaErrors + + +@pytest.mark.parametrize( + "size", [1, 1000, 100 * 1024**2, 100 * 1024**2 + 1, int(200e6)] +) +def test_allocate(size): + pool = VmmPool() + + buf = pool.allocate(size) + assert isinstance(buf, int) + assert buf > 0 + pool.deallocate(buf, size) + + +@pytest.mark.parametrize( + "size", [1, 1000, 100 * 1024**2, 100 * 1024**2 + 1, int(200e6)] +) +def test_mapping_content(size): + pool = VmmPool() + + d_buf = pool.allocate(size) + + h_buf_in = np.arange(size, dtype="u1") + checkCudaErrors( + cuda.cuMemcpyHtoDAsync( + cuda.CUdeviceptr(d_buf), h_buf_in.ctypes.data, size, cuda.CUstream(0) + ) + ) + + h_buf_out = np.empty(size, dtype="u1") + checkCudaErrors( + cuda.cuMemcpyDtoHAsync( + h_buf_out.ctypes.data, cuda.CUdeviceptr(d_buf), size, cuda.CUstream(0) + ) + ) + + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + pool.deallocate(d_buf, size) + + np.testing.assert_equal(h_buf_out, h_buf_in) + + +@pytest.mark.parametrize( + "size", [1, 1000, 64 * 1024**2, 64 * 1024**2 + 1, int(200e6)] +) +def test_block_content(size): + pool = VmmPool() + + d_buf = pool.allocate(size) + vmm_alloc = pool._allocs[d_buf] + + h_buf_in = np.arange(size, dtype="u1") + offset = 0 + block_num = 0 + while offset < size: + block = vmm_alloc.blocks[block_num] + block_size = min(block.size, size - offset) + checkCudaErrors( + cuda.cuMemcpyHtoDAsync( + cuda.CUdeviceptr(block._ptr), + h_buf_in.ctypes.data + offset, + block_size, + cuda.CUstream(0), + ) + ) + + h_buf_out = np.empty(block_size, dtype="u1") + checkCudaErrors( + cuda.cuMemcpyDtoHAsync( + h_buf_out.ctypes.data, + cuda.CUdeviceptr(block._ptr), + block_size, + cuda.CUstream(0), + ) + ) + + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + np.testing.assert_equal(h_buf_out, h_buf_in[offset : offset + block_size]) + + offset += block_size + + pool.deallocate(d_buf, size) diff --git a/dask_cuda/vmm_pool.py b/dask_cuda/vmm_pool.py new file mode 100644 index 000000000..3fe99d104 --- /dev/null +++ b/dask_cuda/vmm_pool.py @@ -0,0 +1,324 @@ +import math +import traceback +from collections import defaultdict +from dataclasses import dataclass +from threading import Lock +from typing import DefaultDict, Dict +from weakref import WeakValueDictionary + +from black import List +from cuda import cuda, cudart, nvrtc + +import rmm.mr + +CU_VMM_SUPPORTED = ( + cuda.CUdevice_attribute.CU_DEVICE_ATTRIBUTE_VIRTUAL_ADDRESS_MANAGEMENT_SUPPORTED +) +CU_VMM_GDR_SUPPORTED = ( + cuda.CUdevice_attribute.CU_DEVICE_ATTRIBUTE_GPU_DIRECT_RDMA_WITH_CUDA_VMM_SUPPORTED +) +CU_VMM_GRANULARITY = ( + cuda.CUmemAllocationGranularity_flags.CU_MEM_ALLOC_GRANULARITY_MINIMUM +) + + +def _cudaGetErrorEnum(error): + if isinstance(error, cuda.CUresult): + err, name = cuda.cuGetErrorName(error) + return name if err == cuda.CUresult.CUDA_SUCCESS else "" + elif isinstance(error, cudart.cudaError_t): + return cudart.cudaGetErrorName(error)[1] + elif isinstance(error, nvrtc.nvrtcResult): + return nvrtc.nvrtcGetErrorString(error)[1] + else: + raise RuntimeError("Unknown error type: {}".format(error)) + + +def checkCudaErrors(result): + if not result[0]: + err = result[0] + msg = "CUDA error code={}({})".format(err.value, _cudaGetErrorEnum(err)) + if err == cuda.CUresult.CUDA_ERROR_OUT_OF_MEMORY: + raise MemoryError(msg) + raise RuntimeError( + msg + "\n" + "\n".join(str(frame) for frame in traceback.extract_stack()) + ) + if len(result) == 1: + return None + elif len(result) == 2: + return result[1] + else: + return result[1:] + + +def _check_support(dev: cuda.CUdevice, attr: cuda.CUdevice_attribute, msg: str) -> None: + if not checkCudaErrors( + cuda.cuDeviceGetAttribute( + attr, + dev, + ) + ): + raise ValueError(msg) + + +def check_vmm_support(dev: cuda.CUdevice) -> None: + return _check_support( + dev, + CU_VMM_SUPPORTED, + f"Device {dev} doesn't support VIRTUAL ADDRESS MANAGEMENT", + ) + + +def check_vmm_gdr_support(dev: cuda.CUdevice) -> None: + return _check_support( + dev, + CU_VMM_GDR_SUPPORTED, + f"Device {dev} doesn't support GPUDirectRDMA for VIRTUAL ADDRESS MANAGEMENT", + ) + + +def get_granularity(dev: cuda.CUdevice): + prop = cuda.CUmemAllocationProp() + prop.type = cuda.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = dev + return checkCudaErrors(cuda.cuMemGetAllocationGranularity(prop, CU_VMM_GRANULARITY)) + + +def to_aligned_size(size: int, granularity: int): + rest = size % granularity + if rest != 0: + size = size + (granularity - rest) + assert size % granularity == 0 + return size + + +def virtual_memory_reserve(size: int) -> int: + # Make a virtual memory reservation. + result = cuda.cuMemAddressReserve( + size=size, alignment=0, addr=cuda.CUdeviceptr(0), flags=0 + ) + # We check the result manually to avoid raise MemoryError, which would + # trigger out-of-memory handling. + if result[0] == cuda.CUresult.CUDA_ERROR_OUT_OF_MEMORY: + raise RuntimeError("cuda.cuMemAddressReserve() - CUDA_ERROR_OUT_OF_MEMORY") + return int(checkCudaErrors(result)) + + +def virtual_memory_set_access(ptr: int, size: int, device: cuda.CUdevice): + # Specify both read and write access. + prop = cuda.CUmemAccessDesc() + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = device + prop.flags = cuda.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE + checkCudaErrors( + cuda.cuMemSetAccess(ptr=cuda.CUdeviceptr(ptr), size=size, desc=[prop], count=1) + ) + + +class VmmBlock: + size: int + + def __init__(self, device: cuda.CUdevice, size: int) -> None: + self._device = device + self.size = size + + # Allocate physical memory + prop = cuda.CUmemAllocationProp() + prop.type = cuda.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + prop.location.type = cuda.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + prop.location.id = device + + # Enable IB/GDRCopy support if available + try: + check_vmm_gdr_support(self._device) + except ValueError: + pass + else: + prop.allocFlags.gpuDirectRDMACapable = 1 + + self.mem_handle = checkCudaErrors(cuda.cuMemCreate(size, prop, 0)) + + # Make a virtual memory reservation. + self._ptr = virtual_memory_reserve(size) + + # Map physical memory to the virtual memory + checkCudaErrors( + cuda.cuMemMap( + ptr=cuda.CUdeviceptr(self._ptr), + size=size, + offset=0, + handle=self.mem_handle, + flags=0, + ) + ) + + # Specify both read and write access. + virtual_memory_set_access(ptr=self._ptr, size=size, device=device) + + def __repr__(self) -> str: + return f"" + + +@dataclass +class DeviceInfo: + device: cuda.CUdevice + granularity: int + + +@dataclass +class VmmAlloc: + size: int + blocks: List[VmmBlock] + ptr: int + + +class VmmPool: + def __init__(self) -> None: + self._max_block_size = 2**29 # 512MiB + self._pool: DefaultDict[int, List[VmmBlock]] = defaultdict(list) + self._allocs: Dict[int, VmmAlloc] = {} + self._lock = Lock() + + def clear(self) -> None: + with self._lock: + # Free all blocks in the pool + for blocks in self._pool.values(): + for block in blocks: + checkCudaErrors( + cuda.cuMemUnmap(cuda.CUdeviceptr(block._ptr), block.size) + ) + # checkCudaErrors(cuda.cuMemAddressFree(alloc.ptr, alloc.size)) + + def __del__(self): + self.clear() + + def get_device_info(self) -> DeviceInfo: + checkCudaErrors(cudart.cudaGetDevice()) # TODO: avoid use of the Runtime API + device = checkCudaErrors(cuda.cuCtxGetDevice()) + granularity = get_granularity(device) + return DeviceInfo(device=device, granularity=granularity) + + def get_block(self, size: int, dev_info: DeviceInfo) -> VmmBlock: + # The block size is the highest power of 2 smaller than `size` + block_size = 2 ** int(math.log2(size)) + # but not larger than the maximum block size + block_size = min(block_size, self._max_block_size) + # and not smaller than the granularity + block_size = max(block_size, dev_info.granularity) + # print(f"get_block({size}) - block_size: {block_size}") + + blocks = self._pool.get(block_size, []) + if blocks: + return blocks.pop() + return VmmBlock(device=dev_info.device, size=block_size) + + def get_blocks(self, min_size: int, dev_info: DeviceInfo) -> List[VmmBlock]: + cur_size = min_size + ret = [] + while cur_size > 0: + block = self.get_block(size=cur_size, dev_info=dev_info) + cur_size -= block.size + ret.append(block) + return ret + + def allocate(self, size: int) -> int: + with self._lock: + dev_info = self.get_device_info() + alloc_size = to_aligned_size(size, granularity=dev_info.granularity) + blocks = self.get_blocks(min_size=alloc_size, dev_info=dev_info) + ptr = virtual_memory_reserve(alloc_size) + print( + f"allocate({hex(ptr)}) - size: {size}, alloc_size: {alloc_size}, " + f"blocks: {blocks}" + ) + + # Map the physical memory of each block to the virtual memory + cur_ptr = ptr + for block in blocks: + checkCudaErrors( + cuda.cuMemMap( + ptr=cuda.CUdeviceptr(cur_ptr), + size=block.size, + offset=0, + handle=block.mem_handle, + flags=0, + ) + ) + cur_ptr += block.size + + # Specify both read and write access. + virtual_memory_set_access(ptr=ptr, size=alloc_size, device=dev_info.device) + + self._allocs[ptr] = VmmAlloc(blocks=blocks, size=alloc_size, ptr=ptr) + return ptr + + def deallocate(self, ptr: int, size: int) -> None: + with self._lock: + checkCudaErrors(cudart.cudaGetDevice()) + checkCudaErrors(cuda.cuStreamSynchronize(cuda.CUstream(0))) + alloc = self._allocs.pop(ptr) + assert alloc.ptr == ptr + + print( + f"delocate({hex(ptr)}) - size: {size}, alloc_size: {alloc.size}, " + f"blocks: {alloc.blocks}" + ) + + # Move all blocks of the allocation to the pool + cur_ptr = ptr + for block in alloc.blocks: + self._pool[block.size].append(block) + checkCudaErrors(cuda.cuMemUnmap(cuda.CUdeviceptr(cur_ptr), block.size)) + ptr += block.size + + # # Free up the previously reserved virtual memory + # checkCudaErrors(cuda.cuMemAddressFree(alloc.ptr, alloc.size)) + + +_vmm_pools = WeakValueDictionary() + + +def rmm_get_current_vmm_pool() -> VmmPool: + def get_stack(mr): + if hasattr(mr, "upstream_mr"): + return [mr] + get_stack(mr.upstream_mr) + return [mr] + + print( + "rmm_get_current_vmm_pool() - stack: ", + get_stack(rmm.mr.get_current_device_resource()), + ) + for mr in get_stack(rmm.mr.get_current_device_resource()): + if id(mr) in _vmm_pools: + return _vmm_pools[id(mr)] + raise ValueError() + + +def rmm_set_current_vmm_pool(skip_if_exist=True) -> None: + + try: + rmm_get_current_vmm_pool() + except ValueError: + pass + else: + if skip_if_exist: + return + raise ValueError("A VMM pool already set") + + vmm_pool = VmmPool() + + def allocate(size: int): + return vmm_pool.allocate(size) + + def deallocate(ptr: int, size: int): + try: + # TODO: fix `StopIteration` exceptions that are sometimes raised + vmm_pool.deallocate(ptr, size) + except StopIteration as e: + print(f"Exception: {type(e)}") + pass + + mr = rmm.mr.CallbackMemoryResource(allocate, deallocate) + _vmm_pools[id(mr)] = vmm_pool + rmm.mr.set_current_device_resource(mr)