Skip to content

Commit

Permalink
Add support for CudaAsyncMemoryResource (#566)
Browse files Browse the repository at this point in the history
Closes #565

Adds the `--rmm-pool-async`/`rmm_pool_async` option to the CLI and cluster to enable the use of `rmm.mr.CudaAsyncMemoryResource` in the RMM initialization.

Authors:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #566
  • Loading branch information
charlesbluca authored Apr 12, 2021
1 parent 1526017 commit fde564e
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 11 deletions.
18 changes: 16 additions & 2 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,25 @@
"WARNING: managed memory is currently incompatible with NVLink, "
"trying to enable both will result in an exception.",
)
@click.option(
"--rmm-async/--no-rmm-async",
default=False,
show_default=True,
help="""Initialize each worker withh RMM and set it to use RMM's asynchronous
allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info.
.. note::
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory, trying to enable both will
result in failure.""",
)
@click.option(
"--rmm-log-directory",
default=None,
help="Directory to write per-worker RMM log files to; the client "
"and scheduler are not logged here."
"NOTE: Logging will only be enabled if --rmm-pool-size or "
"--rmm-managed-memory are specified.",
"NOTE: Logging will only be enabled if --rmm-pool-size, "
"--rmm-managed-memory, or --rmm-async are specified.",
)
@click.option(
"--reconnect/--no-reconnect",
Expand Down Expand Up @@ -211,6 +223,7 @@ def main(
device_memory_limit,
rmm_pool_size,
rmm_managed_memory,
rmm_async,
rmm_log_directory,
pid_file,
resources,
Expand Down Expand Up @@ -255,6 +268,7 @@ def main(
device_memory_limit,
rmm_pool_size,
rmm_managed_memory,
rmm_async,
rmm_log_directory,
pid_file,
resources,
Expand Down
10 changes: 9 additions & 1 deletion dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
device_memory_limit="auto",
rmm_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
pid_file=None,
resources=None,
Expand Down Expand Up @@ -139,6 +140,11 @@ def del_pid_file():
"For installation instructions, please see "
"https://github.com/rapidsai/rmm"
) # pragma: no cover
if rmm_async:
raise ValueError(
"RMM pool and managed memory are incompatible with asynchronous "
"allocator"
)
if rmm_pool_size is not None:
rmm_pool_size = parse_bytes(rmm_pool_size)
else:
Expand Down Expand Up @@ -212,7 +218,9 @@ def del_pid_file():
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
plugins={
CPUAffinity(get_cpu_affinity(i)),
RMMSetup(rmm_pool_size, rmm_managed_memory, rmm_log_directory),
RMMSetup(
rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory,
),
},
name=name if nprocs == 1 or not name else name + "-" + str(i),
local_directory=local_directory,
Expand Down
30 changes: 24 additions & 6 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,18 @@ class LocalCUDACluster(LocalCluster):
.. warning::
Managed memory is currently incompatible with NVLink, trying to enable
both will result in an exception.
rmm_async: bool, default False
Initialize each worker withh RMM and set it to use RMM's asynchronous allocator.
See ``rmm.mr.CudaAsyncMemoryResource`` for more info.
.. note::
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory, trying to enable both will
result in an exception.
rmm_log_directory: str
Directory to write per-worker RMM log files to; the client and scheduler
are not logged here. Logging will only be enabled if ``rmm_pool_size`` or
``rmm_managed_memory`` are specified.
are not logged here. Logging will only be enabled if ``rmm_pool_size``,
``rmm_managed_memory``, or ``rmm_async`` are specified.
jit_unspill: bool
If ``True``, enable just-in-time unspilling. This is experimental and doesn't
support memory spilling to disk. Please see ``proxy_object.ProxyObject`` and
Expand All @@ -143,10 +151,12 @@ class LocalCUDACluster(LocalCluster):
If ``enable_infiniband`` or ``enable_nvlink`` is ``True`` and protocol is not
``"ucx"``.
ValueError
If ``ucx_net_devices`` is an empty string, or if it is ``"auto"`` and UCX-Py is
not installed, or if it is ``"auto"`` and ``enable_infiniband=False``, or UCX-Py
wasn't compiled with hwloc support, or both RMM managed memory and
NVLink are enabled.
If ``ucx_net_devices=""``, if NVLink and RMM managed memory are
both enabled, if RMM pools / managed memory and asynchronous allocator are both
enabled, or if ``ucx_net_devices="auto"`` and:
- UCX-Py is not installed or wasn't compiled with hwloc support; or
- ``enable_infiniband=False``
See Also
--------
Expand All @@ -169,6 +179,7 @@ def __init__(
ucx_net_devices=None,
rmm_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
jit_unspill=None,
log_spilling=False,
Expand Down Expand Up @@ -201,6 +212,7 @@ def __init__(

self.rmm_pool_size = rmm_pool_size
self.rmm_managed_memory = rmm_managed_memory
self.rmm_async = rmm_async
if rmm_pool_size is not None or rmm_managed_memory:
try:
import rmm # noqa F401
Expand All @@ -210,6 +222,11 @@ def __init__(
"is not available. For installation instructions, please "
"see https://github.com/rapidsai/rmm"
) # pragma: no cover
if rmm_async:
raise ValueError(
"RMM pool and managed memory are incompatible with asynchronous "
"allocator"
)
if self.rmm_pool_size is not None:
self.rmm_pool_size = parse_bytes(self.rmm_pool_size)
else:
Expand Down Expand Up @@ -332,6 +349,7 @@ def new_worker_spec(self):
RMMSetup(
self.rmm_pool_size,
self.rmm_managed_memory,
self.rmm_async,
self.rmm_log_directory,
),
},
Expand Down
32 changes: 32 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
from distributed.utils_test import loop # noqa: F401
from distributed.utils_test import popen

import rmm

from dask_cuda.utils import get_n_gpus, wait_workers

_driver_version = rmm._cuda.gpu.driverGetVersion()
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()


def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8"
Expand Down Expand Up @@ -101,6 +106,33 @@ def test_rmm_managed(loop): # noqa: F811
assert v is rmm.mr.ManagedMemoryResource


@pytest.mark.skipif(
((_driver_version, _runtime_version) < (11020, 11020)),
reason="cudaMallocAsync not supported",
)
def test_rmm_async(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-async",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_type = client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.CudaAsyncMemoryResource


def test_rmm_logging(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
Expand Down
22 changes: 22 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
from distributed.system import MEMORY_LIMIT
from distributed.utils_test import gen_test

import rmm

from dask_cuda import CUDAWorker, LocalCUDACluster, utils
from dask_cuda.initialize import initialize

_driver_version = rmm._cuda.gpu.driverGetVersion()
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()


@gen_test(timeout=20)
async def test_local_cuda_cluster():
Expand Down Expand Up @@ -154,6 +159,23 @@ async def test_rmm_managed():
assert v is rmm.mr.ManagedMemoryResource


@pytest.mark.skipif(
((_driver_version, _runtime_version) < (11020, 11020)),
reason="cudaMallocAsync not supported",
)
@gen_test(timeout=20)
async def test_rmm_async():
rmm = pytest.importorskip("rmm")

async with LocalCUDACluster(rmm_async=True, asynchronous=True,) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.CudaAsyncMemoryResource


@gen_test(timeout=20)
async def test_rmm_logging():
rmm = pytest.importorskip("rmm")
Expand Down
15 changes: 13 additions & 2 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,25 @@ def setup(self, worker=None):


class RMMSetup:
def __init__(self, nbytes, managed_memory, log_directory):
def __init__(self, nbytes, managed_memory, async_alloc, log_directory):
self.nbytes = nbytes
self.managed_memory = managed_memory
self.async_alloc = async_alloc
self.logging = log_directory is not None
self.log_directory = log_directory

def setup(self, worker=None):
if self.nbytes is not None or self.managed_memory is True:
if self.async_alloc:
import rmm

rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource())
if self.logging:
rmm.enable_logging(
log_file_name=get_rmm_log_file_name(
worker, self.logging, self.log_directory
)
)
elif self.nbytes is not None or self.managed_memory:
import rmm

pool_allocator = False if self.nbytes is None else True
Expand Down

0 comments on commit fde564e

Please sign in to comment.