diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 1e2991a96..f67c384d4 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -112,13 +112,25 @@ "WARNING: managed memory is currently incompatible with NVLink, " "trying to enable both will result in an exception.", ) +@click.option( + "--rmm-async/--no-rmm-async", + default=False, + show_default=True, + help="""Initialize each worker withh RMM and set it to use RMM's asynchronous + allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info. + + .. note:: + The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also + incompatible with RMM pools and managed memory, trying to enable both will + result in failure.""", +) @click.option( "--rmm-log-directory", default=None, help="Directory to write per-worker RMM log files to; the client " "and scheduler are not logged here." - "NOTE: Logging will only be enabled if --rmm-pool-size or " - "--rmm-managed-memory are specified.", + "NOTE: Logging will only be enabled if --rmm-pool-size, " + "--rmm-managed-memory, or --rmm-async are specified.", ) @click.option( "--reconnect/--no-reconnect", @@ -211,6 +223,7 @@ def main( device_memory_limit, rmm_pool_size, rmm_managed_memory, + rmm_async, rmm_log_directory, pid_file, resources, @@ -255,6 +268,7 @@ def main( device_memory_limit, rmm_pool_size, rmm_managed_memory, + rmm_async, rmm_log_directory, pid_file, resources, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index cd452a648..557caadda 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -56,6 +56,7 @@ def __init__( device_memory_limit="auto", rmm_pool_size=None, rmm_managed_memory=False, + rmm_async=False, rmm_log_directory=None, pid_file=None, resources=None, @@ -139,6 +140,11 @@ def del_pid_file(): "For installation instructions, please see " "https://github.com/rapidsai/rmm" ) # pragma: no cover + if rmm_async: + raise ValueError( + "RMM pool and managed memory are incompatible with asynchronous " + "allocator" + ) if rmm_pool_size is not None: rmm_pool_size = parse_bytes(rmm_pool_size) else: @@ -212,7 +218,9 @@ def del_pid_file(): env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, plugins={ CPUAffinity(get_cpu_affinity(i)), - RMMSetup(rmm_pool_size, rmm_managed_memory, rmm_log_directory), + RMMSetup( + rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, + ), }, name=name if nprocs == 1 or not name else name + "-" + str(i), local_directory=local_directory, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index fc82d39fc..9e2e744ea 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -116,10 +116,18 @@ class LocalCUDACluster(LocalCluster): .. warning:: Managed memory is currently incompatible with NVLink, trying to enable both will result in an exception. + rmm_async: bool, default False + Initialize each worker withh RMM and set it to use RMM's asynchronous allocator. + See ``rmm.mr.CudaAsyncMemoryResource`` for more info. + + .. note:: + The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also + incompatible with RMM pools and managed memory, trying to enable both will + result in an exception. rmm_log_directory: str Directory to write per-worker RMM log files to; the client and scheduler - are not logged here. Logging will only be enabled if ``rmm_pool_size`` or - ``rmm_managed_memory`` are specified. + are not logged here. Logging will only be enabled if ``rmm_pool_size``, + ``rmm_managed_memory``, or ``rmm_async`` are specified. jit_unspill: bool If ``True``, enable just-in-time unspilling. This is experimental and doesn't support memory spilling to disk. Please see ``proxy_object.ProxyObject`` and @@ -143,10 +151,12 @@ class LocalCUDACluster(LocalCluster): If ``enable_infiniband`` or ``enable_nvlink`` is ``True`` and protocol is not ``"ucx"``. ValueError - If ``ucx_net_devices`` is an empty string, or if it is ``"auto"`` and UCX-Py is - not installed, or if it is ``"auto"`` and ``enable_infiniband=False``, or UCX-Py - wasn't compiled with hwloc support, or both RMM managed memory and - NVLink are enabled. + If ``ucx_net_devices=""``, if NVLink and RMM managed memory are + both enabled, if RMM pools / managed memory and asynchronous allocator are both + enabled, or if ``ucx_net_devices="auto"`` and: + + - UCX-Py is not installed or wasn't compiled with hwloc support; or + - ``enable_infiniband=False`` See Also -------- @@ -169,6 +179,7 @@ def __init__( ucx_net_devices=None, rmm_pool_size=None, rmm_managed_memory=False, + rmm_async=False, rmm_log_directory=None, jit_unspill=None, log_spilling=False, @@ -201,6 +212,7 @@ def __init__( self.rmm_pool_size = rmm_pool_size self.rmm_managed_memory = rmm_managed_memory + self.rmm_async = rmm_async if rmm_pool_size is not None or rmm_managed_memory: try: import rmm # noqa F401 @@ -210,6 +222,11 @@ def __init__( "is not available. For installation instructions, please " "see https://github.com/rapidsai/rmm" ) # pragma: no cover + if rmm_async: + raise ValueError( + "RMM pool and managed memory are incompatible with asynchronous " + "allocator" + ) if self.rmm_pool_size is not None: self.rmm_pool_size = parse_bytes(self.rmm_pool_size) else: @@ -332,6 +349,7 @@ def new_worker_spec(self): RMMSetup( self.rmm_pool_size, self.rmm_managed_memory, + self.rmm_async, self.rmm_log_directory, ), }, diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 49af8db7d..9ca9d8efe 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -10,8 +10,13 @@ from distributed.utils_test import loop # noqa: F401 from distributed.utils_test import popen +import rmm + from dask_cuda.utils import get_n_gpus, wait_workers +_driver_version = rmm._cuda.gpu.driverGetVersion() +_runtime_version = rmm._cuda.gpu.runtimeGetVersion() + def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8" @@ -101,6 +106,33 @@ def test_rmm_managed(loop): # noqa: F811 assert v is rmm.mr.ManagedMemoryResource +@pytest.mark.skipif( + ((_driver_version, _runtime_version) < (11020, 11020)), + reason="cudaMallocAsync not supported", +) +def test_rmm_async(loop): # noqa: F811 + rmm = pytest.importorskip("rmm") + with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask-cuda-worker", + "127.0.0.1:9369", + "--host", + "127.0.0.1", + "--rmm-async", + "--no-dashboard", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + memory_resource_type = client.run( + rmm.mr.get_current_device_resource_type + ) + for v in memory_resource_type.values(): + assert v is rmm.mr.CudaAsyncMemoryResource + + def test_rmm_logging(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index f3b0bb1de..0237bfaed 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -6,9 +6,14 @@ from distributed.system import MEMORY_LIMIT from distributed.utils_test import gen_test +import rmm + from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize +_driver_version = rmm._cuda.gpu.driverGetVersion() +_runtime_version = rmm._cuda.gpu.runtimeGetVersion() + @gen_test(timeout=20) async def test_local_cuda_cluster(): @@ -154,6 +159,23 @@ async def test_rmm_managed(): assert v is rmm.mr.ManagedMemoryResource +@pytest.mark.skipif( + ((_driver_version, _runtime_version) < (11020, 11020)), + reason="cudaMallocAsync not supported", +) +@gen_test(timeout=20) +async def test_rmm_async(): + rmm = pytest.importorskip("rmm") + + async with LocalCUDACluster(rmm_async=True, asynchronous=True,) as cluster: + async with Client(cluster, asynchronous=True) as client: + memory_resource_type = await client.run( + rmm.mr.get_current_device_resource_type + ) + for v in memory_resource_type.values(): + assert v is rmm.mr.CudaAsyncMemoryResource + + @gen_test(timeout=20) async def test_rmm_logging(): rmm = pytest.importorskip("rmm") diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 14641278e..fc697a097 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -32,14 +32,25 @@ def setup(self, worker=None): class RMMSetup: - def __init__(self, nbytes, managed_memory, log_directory): + def __init__(self, nbytes, managed_memory, async_alloc, log_directory): self.nbytes = nbytes self.managed_memory = managed_memory + self.async_alloc = async_alloc self.logging = log_directory is not None self.log_directory = log_directory def setup(self, worker=None): - if self.nbytes is not None or self.managed_memory is True: + if self.async_alloc: + import rmm + + rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + if self.logging: + rmm.enable_logging( + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ) + ) + elif self.nbytes is not None or self.managed_memory: import rmm pool_allocator = False if self.nbytes is None else True