From aa96e847537922f51871d77a05fee65d44da2ada Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 9 Apr 2021 11:22:37 -0700 Subject: [PATCH 01/11] Add support for CUDA async memory resource --- dask_cuda/cli/dask_cuda_worker.py | 8 +++++++ dask_cuda/cuda_worker.py | 8 ++++++- dask_cuda/local_cuda_cluster.py | 5 +++++ dask_cuda/tests/test_dask_cuda_worker.py | 25 ++++++++++++++++++++++ dask_cuda/tests/test_local_cuda_cluster.py | 15 +++++++++++++ dask_cuda/utils.py | 8 ++++++- 6 files changed, 67 insertions(+), 2 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 1e2991a96..d0986c6b1 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -102,6 +102,12 @@ "NOTE: This size is a per worker (i.e., per GPU) configuration, " "and not cluster-wide!", ) +@click.option( + "--rmm-pool-async/--no-rmm-pool-async", + default=False, + show_default=True, + help="Use an ``rmm.mr.CudaAsyncMemoryResource`` when initializing an RMM pool.", +) @click.option( "--rmm-managed-memory/--no-rmm-managed-memory", default=False, @@ -210,6 +216,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_pool_async, rmm_managed_memory, rmm_log_directory, pid_file, @@ -254,6 +261,7 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, + rmm_pool_async, rmm_managed_memory, rmm_log_directory, pid_file, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index cd452a648..51709740e 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -55,6 +55,7 @@ def __init__( memory_limit="auto", device_memory_limit="auto", rmm_pool_size=None, + rmm_pool_async=False, rmm_managed_memory=False, rmm_log_directory=None, pid_file=None, @@ -212,7 +213,12 @@ def del_pid_file(): env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, plugins={ CPUAffinity(get_cpu_affinity(i)), - RMMSetup(rmm_pool_size, rmm_managed_memory, rmm_log_directory), + RMMSetup( + rmm_pool_size, + rmm_pool_async, + rmm_managed_memory, + rmm_log_directory, + ), }, name=name if nprocs == 1 or not name else name + "-" + str(i), local_directory=local_directory, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index fc82d39fc..337b5d439 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -108,6 +108,8 @@ class LocalCUDACluster(LocalCluster): .. note:: The size is a per worker (i.e., per GPU) configuration, and not cluster-wide! + rmm_pool_async: bool, default False + Use an ``rmm.mr.CudaAsyncMemoryResource`` when initializing an RMM pool. rmm_managed_memory: bool If ``True``, initialize each worker with RMM and set it to use managed memory. If ``False``, RMM may still be used if ``rmm_pool_size`` is specified, @@ -168,6 +170,7 @@ def __init__( enable_rdmacm=False, ucx_net_devices=None, rmm_pool_size=None, + rmm_pool_async=False, rmm_managed_memory=False, rmm_log_directory=None, jit_unspill=None, @@ -200,6 +203,7 @@ def __init__( ) self.rmm_pool_size = rmm_pool_size + self.rmm_pool_async = rmm_pool_async self.rmm_managed_memory = rmm_managed_memory if rmm_pool_size is not None or rmm_managed_memory: try: @@ -331,6 +335,7 @@ def new_worker_spec(self): CPUAffinity(get_cpu_affinity(worker_count)), RMMSetup( self.rmm_pool_size, + self.rmm_pool_async, self.rmm_managed_memory, self.rmm_log_directory, ), diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 49af8db7d..32d7d55c2 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -78,6 +78,31 @@ def test_rmm_pool(loop): # noqa: F811 assert v is rmm.mr.PoolMemoryResource +def test_rmm_pool_async(loop): # noqa: F811 + rmm = pytest.importorskip("rmm") + with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask-cuda-worker", + "127.0.0.1:9369", + "--host", + "127.0.0.1", + "--rmm-pool-size", + "2 GB", + "--rmm-pool-async", + "--no-dashboard", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + memory_resource_type = client.run( + rmm.mr.get_current_device_resource_type + ) + for v in memory_resource_type.values(): + assert v is rmm.mr.CudaMemoryResource + + def test_rmm_managed(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index f3b0bb1de..bd932280b 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -141,6 +141,21 @@ async def test_rmm_pool(): assert v is rmm.mr.PoolMemoryResource +@gen_test(timeout=20) +async def test_rmm_pool_async(): + rmm = pytest.importorskip("rmm") + + async with LocalCUDACluster( + rmm_pool_size="2GB", rmm_pool_async=True, asynchronous=True, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + memory_resource_type = await client.run( + rmm.mr.get_current_device_resource_type + ) + for v in memory_resource_type.values(): + assert v is rmm.mr.CudaMemoryResource + + @gen_test(timeout=20) async def test_rmm_managed(): rmm = pytest.importorskip("rmm") diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 14641278e..3af29c227 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -32,8 +32,9 @@ def setup(self, worker=None): class RMMSetup: - def __init__(self, nbytes, managed_memory, log_directory): + def __init__(self, nbytes, pool_async, managed_memory, log_directory): self.nbytes = nbytes + self.pool_async = pool_async self.managed_memory = managed_memory self.logging = log_directory is not None self.log_directory = log_directory @@ -42,6 +43,11 @@ def setup(self, worker=None): if self.nbytes is not None or self.managed_memory is True: import rmm + if self.pool_async is True: + rmm.mr.set_current_device_resource( + rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource()) + ) + pool_allocator = False if self.nbytes is None else True rmm.reinitialize( From 992b64435379f99864fc607ae1c52c93006ec3bd Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 9 Apr 2021 14:16:22 -0700 Subject: [PATCH 02/11] Skip async test for <11.2 --- dask_cuda/tests/test_dask_cuda_worker.py | 11 ++++++++++- dask_cuda/tests/test_local_cuda_cluster.py | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 32d7d55c2..26990dc1b 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -10,8 +10,13 @@ from distributed.utils_test import loop # noqa: F401 from distributed.utils_test import popen +import rmm + from dask_cuda.utils import get_n_gpus, wait_workers +_driver_version = rmm._cuda.gpu.driverGetVersion() +_runtime_version = rmm._cuda.gpu.runtimeGetVersion() + def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8" @@ -78,6 +83,10 @@ def test_rmm_pool(loop): # noqa: F811 assert v is rmm.mr.PoolMemoryResource +@pytest.mark.skipif( + ((_driver_version, _runtime_version) < (11020, 11020)), + reason="cudaMallocAsync not supported", +) def test_rmm_pool_async(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): @@ -100,7 +109,7 @@ def test_rmm_pool_async(loop): # noqa: F811 rmm.mr.get_current_device_resource_type ) for v in memory_resource_type.values(): - assert v is rmm.mr.CudaMemoryResource + assert v is rmm.mr.PoolMemoryResource def test_rmm_managed(loop): # noqa: F811 diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index bd932280b..2950b67c8 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -6,9 +6,14 @@ from distributed.system import MEMORY_LIMIT from distributed.utils_test import gen_test +import rmm + from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize +_driver_version = rmm._cuda.gpu.driverGetVersion() +_runtime_version = rmm._cuda.gpu.runtimeGetVersion() + @gen_test(timeout=20) async def test_local_cuda_cluster(): @@ -141,6 +146,10 @@ async def test_rmm_pool(): assert v is rmm.mr.PoolMemoryResource +@pytest.mark.skipif( + ((_driver_version, _runtime_version) < (11020, 11020)), + reason="cudaMallocAsync not supported", +) @gen_test(timeout=20) async def test_rmm_pool_async(): rmm = pytest.importorskip("rmm") @@ -153,7 +162,7 @@ async def test_rmm_pool_async(): rmm.mr.get_current_device_resource_type ) for v in memory_resource_type.values(): - assert v is rmm.mr.CudaMemoryResource + assert v is rmm.mr.PoolMemoryResource @gen_test(timeout=20) From 568d42667b9afc1dd3c1f40bea5e09f4a98909c0 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 9 Apr 2021 15:37:19 -0700 Subject: [PATCH 03/11] Async allocator is mutually exclusive with pools / managed memory --- dask_cuda/cli/dask_cuda_worker.py | 22 ++++++++++++++-------- dask_cuda/cuda_worker.py | 7 ++----- dask_cuda/local_cuda_cluster.py | 16 +++++++++++----- dask_cuda/tests/test_dask_cuda_worker.py | 22 ++++++++++------------ dask_cuda/tests/test_local_cuda_cluster.py | 22 ++++++++++------------ dask_cuda/utils.py | 20 +++++++++++++------- 6 files changed, 60 insertions(+), 49 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index d0986c6b1..0c8532f42 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -102,12 +102,6 @@ "NOTE: This size is a per worker (i.e., per GPU) configuration, " "and not cluster-wide!", ) -@click.option( - "--rmm-pool-async/--no-rmm-pool-async", - default=False, - show_default=True, - help="Use an ``rmm.mr.CudaAsyncMemoryResource`` when initializing an RMM pool.", -) @click.option( "--rmm-managed-memory/--no-rmm-managed-memory", default=False, @@ -118,6 +112,18 @@ "WARNING: managed memory is currently incompatible with NVLink, " "trying to enable both will result in an exception.", ) +@click.option( + "--rmm-async/--no-rmm-async", + default=False, + show_default=True, + help="""Initialize each worker withh RMM and set it to use RMM's asynchronous + allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info. + + .. note:: + The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also + incompatible with RMM pools and managed memory, and will be preferred over them + if both are enabled.""", +) @click.option( "--rmm-log-directory", default=None, @@ -216,8 +222,8 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, - rmm_pool_async, rmm_managed_memory, + rmm_async, rmm_log_directory, pid_file, resources, @@ -261,8 +267,8 @@ def main( memory_limit, device_memory_limit, rmm_pool_size, - rmm_pool_async, rmm_managed_memory, + rmm_async, rmm_log_directory, pid_file, resources, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 51709740e..8fd7c84e9 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -55,8 +55,8 @@ def __init__( memory_limit="auto", device_memory_limit="auto", rmm_pool_size=None, - rmm_pool_async=False, rmm_managed_memory=False, + rmm_async=False, rmm_log_directory=None, pid_file=None, resources=None, @@ -214,10 +214,7 @@ def del_pid_file(): plugins={ CPUAffinity(get_cpu_affinity(i)), RMMSetup( - rmm_pool_size, - rmm_pool_async, - rmm_managed_memory, - rmm_log_directory, + rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, ), }, name=name if nprocs == 1 or not name else name + "-" + str(i), diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 337b5d439..43df9bdf9 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -108,8 +108,6 @@ class LocalCUDACluster(LocalCluster): .. note:: The size is a per worker (i.e., per GPU) configuration, and not cluster-wide! - rmm_pool_async: bool, default False - Use an ``rmm.mr.CudaAsyncMemoryResource`` when initializing an RMM pool. rmm_managed_memory: bool If ``True``, initialize each worker with RMM and set it to use managed memory. If ``False``, RMM may still be used if ``rmm_pool_size`` is specified, @@ -118,6 +116,14 @@ class LocalCUDACluster(LocalCluster): .. warning:: Managed memory is currently incompatible with NVLink, trying to enable both will result in an exception. + rmm_async: bool, default False + Initialize each worker withh RMM and set it to use RMM's asynchronous allocator. + See ``rmm.mr.CudaAsyncMemoryResource`` for more info. + + .. note:: + The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also + incompatible with RMM pools and managed memory, and will be preferred over + them if both are enabled. rmm_log_directory: str Directory to write per-worker RMM log files to; the client and scheduler are not logged here. Logging will only be enabled if ``rmm_pool_size`` or @@ -170,8 +176,8 @@ def __init__( enable_rdmacm=False, ucx_net_devices=None, rmm_pool_size=None, - rmm_pool_async=False, rmm_managed_memory=False, + rmm_async=False, rmm_log_directory=None, jit_unspill=None, log_spilling=False, @@ -203,8 +209,8 @@ def __init__( ) self.rmm_pool_size = rmm_pool_size - self.rmm_pool_async = rmm_pool_async self.rmm_managed_memory = rmm_managed_memory + self.rmm_async = rmm_async if rmm_pool_size is not None or rmm_managed_memory: try: import rmm # noqa F401 @@ -335,8 +341,8 @@ def new_worker_spec(self): CPUAffinity(get_cpu_affinity(worker_count)), RMMSetup( self.rmm_pool_size, - self.rmm_pool_async, self.rmm_managed_memory, + self.rmm_async, self.rmm_log_directory, ), }, diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 26990dc1b..7150d9707 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -83,11 +83,7 @@ def test_rmm_pool(loop): # noqa: F811 assert v is rmm.mr.PoolMemoryResource -@pytest.mark.skipif( - ((_driver_version, _runtime_version) < (11020, 11020)), - reason="cudaMallocAsync not supported", -) -def test_rmm_pool_async(loop): # noqa: F811 +def test_rmm_managed(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): with popen( @@ -96,9 +92,7 @@ def test_rmm_pool_async(loop): # noqa: F811 "127.0.0.1:9369", "--host", "127.0.0.1", - "--rmm-pool-size", - "2 GB", - "--rmm-pool-async", + "--rmm-managed-memory", "--no-dashboard", ] ): @@ -109,10 +103,14 @@ def test_rmm_pool_async(loop): # noqa: F811 rmm.mr.get_current_device_resource_type ) for v in memory_resource_type.values(): - assert v is rmm.mr.PoolMemoryResource + assert v is rmm.mr.ManagedMemoryResource -def test_rmm_managed(loop): # noqa: F811 +@pytest.mark.skipif( + ((_driver_version, _runtime_version) < (11020, 11020)), + reason="cudaMallocAsync not supported", +) +def test_rmm_async(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): with popen( @@ -121,7 +119,7 @@ def test_rmm_managed(loop): # noqa: F811 "127.0.0.1:9369", "--host", "127.0.0.1", - "--rmm-managed-memory", + "--rmm-pool-async", "--no-dashboard", ] ): @@ -132,7 +130,7 @@ def test_rmm_managed(loop): # noqa: F811 rmm.mr.get_current_device_resource_type ) for v in memory_resource_type.values(): - assert v is rmm.mr.ManagedMemoryResource + assert v is rmm.mr.CudaAsyncMemoryResource def test_rmm_logging(loop): # noqa: F811 diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 2950b67c8..72ba03893 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -146,36 +146,34 @@ async def test_rmm_pool(): assert v is rmm.mr.PoolMemoryResource -@pytest.mark.skipif( - ((_driver_version, _runtime_version) < (11020, 11020)), - reason="cudaMallocAsync not supported", -) @gen_test(timeout=20) -async def test_rmm_pool_async(): +async def test_rmm_managed(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster( - rmm_pool_size="2GB", rmm_pool_async=True, asynchronous=True, - ) as cluster: + async with LocalCUDACluster(rmm_managed_memory=True, asynchronous=True,) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( rmm.mr.get_current_device_resource_type ) for v in memory_resource_type.values(): - assert v is rmm.mr.PoolMemoryResource + assert v is rmm.mr.ManagedMemoryResource +@pytest.mark.skipif( + ((_driver_version, _runtime_version) < (11020, 11020)), + reason="cudaMallocAsync not supported", +) @gen_test(timeout=20) -async def test_rmm_managed(): +async def test_rmm_async(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster(rmm_managed_memory=True, asynchronous=True,) as cluster: + async with LocalCUDACluster(rmm_pool_async=True, asynchronous=True,) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( rmm.mr.get_current_device_resource_type ) for v in memory_resource_type.values(): - assert v is rmm.mr.ManagedMemoryResource + assert v is rmm.mr.CudaAsyncMemoryResource @gen_test(timeout=20) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 3af29c227..a06af3b23 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -32,21 +32,27 @@ def setup(self, worker=None): class RMMSetup: - def __init__(self, nbytes, pool_async, managed_memory, log_directory): + def __init__(self, nbytes, managed_memory, async_alloc, log_directory): self.nbytes = nbytes - self.pool_async = pool_async self.managed_memory = managed_memory + self.async_alloc = async_alloc self.logging = log_directory is not None self.log_directory = log_directory def setup(self, worker=None): - if self.nbytes is not None or self.managed_memory is True: + if self.async_alloc is True: import rmm - if self.pool_async is True: - rmm.mr.set_current_device_resource( - rmm.mr.PoolMemoryResource(rmm.mr.CudaAsyncMemoryResource()) - ) + rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + + rmm.reinitialize( + logging=self.logging, + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ), + ) + elif self.nbytes is not None or self.managed_memory is True: + import rmm pool_allocator = False if self.nbytes is None else True From 4ec20ac74a0fb355885dccd182d06ee931db7773 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 07:05:37 -0700 Subject: [PATCH 04/11] Raise ValueError if pools / managed memory and async both enabled --- dask_cuda/cli/dask_cuda_worker.py | 4 ++-- dask_cuda/cuda_worker.py | 5 +++++ dask_cuda/local_cuda_cluster.py | 19 +++++++++++++------ 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 0c8532f42..dd93f6b1d 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -121,8 +121,8 @@ .. note:: The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also - incompatible with RMM pools and managed memory, and will be preferred over them - if both are enabled.""", + incompatible with RMM pools and managed memory, trying to enable both will + result in an exception.""", ) @click.option( "--rmm-log-directory", diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 8fd7c84e9..461540de7 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -140,6 +140,11 @@ def del_pid_file(): "For installation instructions, please see " "https://github.com/rapidsai/rmm" ) # pragma: no cover + if rmm_async: + raise ValueError( + """RMM pool and managed memory are incompatible with asynchronous + allocator""" + ) if rmm_pool_size is not None: rmm_pool_size = parse_bytes(rmm_pool_size) else: diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 43df9bdf9..4da259a55 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -122,8 +122,8 @@ class LocalCUDACluster(LocalCluster): .. note:: The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also - incompatible with RMM pools and managed memory, and will be preferred over - them if both are enabled. + incompatible with RMM pools and managed memory, trying to enable both will + result in an exception. rmm_log_directory: str Directory to write per-worker RMM log files to; the client and scheduler are not logged here. Logging will only be enabled if ``rmm_pool_size`` or @@ -151,10 +151,12 @@ class LocalCUDACluster(LocalCluster): If ``enable_infiniband`` or ``enable_nvlink`` is ``True`` and protocol is not ``"ucx"``. ValueError - If ``ucx_net_devices`` is an empty string, or if it is ``"auto"`` and UCX-Py is - not installed, or if it is ``"auto"`` and ``enable_infiniband=False``, or UCX-Py - wasn't compiled with hwloc support, or both RMM managed memory and - NVLink are enabled. + If ``ucx_net_devices=""``, if NVLink and RMM managed memory are + both enabled, if RMM pools / managed memory and asynchronous allocator are both + enabled, or if ``ucx_net_devices="auto"`` and: + + - UCX-Py is not installed or wasn't compiled with hwloc support or + - ``enable_infiniband=False`` See Also -------- @@ -220,6 +222,11 @@ def __init__( "is not available. For installation instructions, please " "see https://github.com/rapidsai/rmm" ) # pragma: no cover + if self.rmm_async: + raise ValueError( + """RMM pool and managed memory are incompatible with asynchronous + allocator""" + ) if self.rmm_pool_size is not None: self.rmm_pool_size = parse_bytes(self.rmm_pool_size) else: From d5d446b563b380361f68936a2b62c78336f5d6e8 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 07:08:38 -0700 Subject: [PATCH 05/11] Don't reinitialize for RMM async --- dask_cuda/utils.py | 38 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index a06af3b23..cfbf1fcb4 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -40,31 +40,23 @@ def __init__(self, nbytes, managed_memory, async_alloc, log_directory): self.log_directory = log_directory def setup(self, worker=None): - if self.async_alloc is True: + if self.nbytes is not None or self.managed_memory or self.async_alloc: import rmm - rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) - - rmm.reinitialize( - logging=self.logging, - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ), - ) - elif self.nbytes is not None or self.managed_memory is True: - import rmm - - pool_allocator = False if self.nbytes is None else True - - rmm.reinitialize( - pool_allocator=pool_allocator, - managed_memory=self.managed_memory, - initial_pool_size=self.nbytes, - logging=self.logging, - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ), - ) + if self.async_alloc: + rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + else: + pool_allocator = False if self.nbytes is None else True + + rmm.reinitialize( + pool_allocator=pool_allocator, + managed_memory=self.managed_memory, + initial_pool_size=self.nbytes, + logging=self.logging, + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ), + ) def unpack_bitmask(x, mask_bits=64): From 56d6b841106fc5109e409d7a08daa8dd50e46470 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 08:43:59 -0700 Subject: [PATCH 06/11] Change kwarg in tests --- dask_cuda/tests/test_dask_cuda_worker.py | 2 +- dask_cuda/tests/test_local_cuda_cluster.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 7150d9707..9ca9d8efe 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -119,7 +119,7 @@ def test_rmm_async(loop): # noqa: F811 "127.0.0.1:9369", "--host", "127.0.0.1", - "--rmm-pool-async", + "--rmm-async", "--no-dashboard", ] ): diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 72ba03893..0237bfaed 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -167,7 +167,7 @@ async def test_rmm_managed(): async def test_rmm_async(): rmm = pytest.importorskip("rmm") - async with LocalCUDACluster(rmm_pool_async=True, asynchronous=True,) as cluster: + async with LocalCUDACluster(rmm_async=True, asynchronous=True,) as cluster: async with Client(cluster, asynchronous=True) as client: memory_resource_type = await client.run( rmm.mr.get_current_device_resource_type From 359746f90f46596e16bce80412237043037248b1 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 08:49:11 -0700 Subject: [PATCH 07/11] Add logging --- dask_cuda/cli/dask_cuda_worker.py | 4 ++-- dask_cuda/local_cuda_cluster.py | 4 ++-- dask_cuda/utils.py | 6 ++++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index dd93f6b1d..930220ca5 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -129,8 +129,8 @@ default=None, help="Directory to write per-worker RMM log files to; the client " "and scheduler are not logged here." - "NOTE: Logging will only be enabled if --rmm-pool-size or " - "--rmm-managed-memory are specified.", + "NOTE: Logging will only be enabled if --rmm-pool-size, " + "--rmm-managed-memory, or --rmm-async are specified.", ) @click.option( "--reconnect/--no-reconnect", diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 4da259a55..3e41de0a9 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -126,8 +126,8 @@ class LocalCUDACluster(LocalCluster): result in an exception. rmm_log_directory: str Directory to write per-worker RMM log files to; the client and scheduler - are not logged here. Logging will only be enabled if ``rmm_pool_size`` or - ``rmm_managed_memory`` are specified. + are not logged here. Logging will only be enabled if ``rmm_pool_size``, + ``rmm_managed_memory``, or ``rmm_async`` are specified. jit_unspill: bool If ``True``, enable just-in-time unspilling. This is experimental and doesn't support memory spilling to disk. Please see ``proxy_object.ProxyObject`` and diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index cfbf1fcb4..5e46f28b9 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -45,6 +45,12 @@ def setup(self, worker=None): if self.async_alloc: rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + if self.logging: + rmm.enable_logging( + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ) + ) else: pool_allocator = False if self.nbytes is None else True From fb89979e4249ee5ea21bf5966905fccb10d4deab Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 09:17:53 -0700 Subject: [PATCH 08/11] Go back to mutually exclusive async / pool setup --- dask_cuda/utils.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 5e46f28b9..fc697a097 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -40,29 +40,30 @@ def __init__(self, nbytes, managed_memory, async_alloc, log_directory): self.log_directory = log_directory def setup(self, worker=None): - if self.nbytes is not None or self.managed_memory or self.async_alloc: + if self.async_alloc: import rmm - if self.async_alloc: - rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) - if self.logging: - rmm.enable_logging( - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ) - ) - else: - pool_allocator = False if self.nbytes is None else True - - rmm.reinitialize( - pool_allocator=pool_allocator, - managed_memory=self.managed_memory, - initial_pool_size=self.nbytes, - logging=self.logging, + rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + if self.logging: + rmm.enable_logging( log_file_name=get_rmm_log_file_name( worker, self.logging, self.log_directory - ), + ) ) + elif self.nbytes is not None or self.managed_memory: + import rmm + + pool_allocator = False if self.nbytes is None else True + + rmm.reinitialize( + pool_allocator=pool_allocator, + managed_memory=self.managed_memory, + initial_pool_size=self.nbytes, + logging=self.logging, + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ), + ) def unpack_bitmask(x, mask_bits=64): From e67befddf2c20c909886e45274fcc10e8d59e181 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 11:34:42 -0700 Subject: [PATCH 09/11] Fix cluster docstring --- dask_cuda/local_cuda_cluster.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 3e41de0a9..41badf5de 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -151,12 +151,12 @@ class LocalCUDACluster(LocalCluster): If ``enable_infiniband`` or ``enable_nvlink`` is ``True`` and protocol is not ``"ucx"``. ValueError - If ``ucx_net_devices=""``, if NVLink and RMM managed memory are - both enabled, if RMM pools / managed memory and asynchronous allocator are both - enabled, or if ``ucx_net_devices="auto"`` and: + If ``ucx_net_devices=""``, if NVLink and RMM managed memory are + both enabled, if RMM pools / managed memory and asynchronous allocator are both + enabled, or if ``ucx_net_devices="auto"`` and: - - UCX-Py is not installed or wasn't compiled with hwloc support or - - ``enable_infiniband=False`` + - UCX-Py is not installed or wasn't compiled with hwloc support; or + - ``enable_infiniband=False`` See Also -------- From e20e0ed5351bb2f796d3c6ed2df5b6a708f773fa Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 11:47:08 -0700 Subject: [PATCH 10/11] Fix ValueError message --- dask_cuda/cuda_worker.py | 4 ++-- dask_cuda/local_cuda_cluster.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 461540de7..557caadda 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -142,8 +142,8 @@ def del_pid_file(): ) # pragma: no cover if rmm_async: raise ValueError( - """RMM pool and managed memory are incompatible with asynchronous - allocator""" + "RMM pool and managed memory are incompatible with asynchronous " + "allocator" ) if rmm_pool_size is not None: rmm_pool_size = parse_bytes(rmm_pool_size) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 41badf5de..9e2e744ea 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -222,10 +222,10 @@ def __init__( "is not available. For installation instructions, please " "see https://github.com/rapidsai/rmm" ) # pragma: no cover - if self.rmm_async: + if rmm_async: raise ValueError( - """RMM pool and managed memory are incompatible with asynchronous - allocator""" + "RMM pool and managed memory are incompatible with asynchronous " + "allocator" ) if self.rmm_pool_size is not None: self.rmm_pool_size = parse_bytes(self.rmm_pool_size) From 6a964320cc9bfcfb031b92943b2da14b2b462264 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 12 Apr 2021 11:54:32 -0700 Subject: [PATCH 11/11] Clarify failure in CLI --- dask_cuda/cli/dask_cuda_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 930220ca5..f67c384d4 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -122,7 +122,7 @@ .. note:: The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory, trying to enable both will - result in an exception.""", + result in failure.""", ) @click.option( "--rmm-log-directory",