Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for CudaAsyncMemoryResource #566

Merged
merged 11 commits into from
Apr 12, 2021
18 changes: 16 additions & 2 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,25 @@
"WARNING: managed memory is currently incompatible with NVLink, "
"trying to enable both will result in an exception.",
)
@click.option(
"--rmm-async/--no-rmm-async",
default=False,
show_default=True,
help="""Initialize each worker withh RMM and set it to use RMM's asynchronous
allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info.

.. note::
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory, trying to enable both will
result in failure.""",
)
@click.option(
"--rmm-log-directory",
default=None,
help="Directory to write per-worker RMM log files to; the client "
"and scheduler are not logged here."
"NOTE: Logging will only be enabled if --rmm-pool-size or "
"--rmm-managed-memory are specified.",
"NOTE: Logging will only be enabled if --rmm-pool-size, "
"--rmm-managed-memory, or --rmm-async are specified.",
)
@click.option(
"--reconnect/--no-reconnect",
Expand Down Expand Up @@ -211,6 +223,7 @@ def main(
device_memory_limit,
rmm_pool_size,
rmm_managed_memory,
rmm_async,
rmm_log_directory,
pid_file,
resources,
Expand Down Expand Up @@ -255,6 +268,7 @@ def main(
device_memory_limit,
rmm_pool_size,
rmm_managed_memory,
rmm_async,
rmm_log_directory,
pid_file,
resources,
Expand Down
10 changes: 9 additions & 1 deletion dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
device_memory_limit="auto",
rmm_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
pid_file=None,
resources=None,
Expand Down Expand Up @@ -139,6 +140,11 @@ def del_pid_file():
"For installation instructions, please see "
"https://github.com/rapidsai/rmm"
) # pragma: no cover
if rmm_async:
raise ValueError(
"RMM pool and managed memory are incompatible with asynchronous "
"allocator"
)
if rmm_pool_size is not None:
rmm_pool_size = parse_bytes(rmm_pool_size)
else:
Expand Down Expand Up @@ -212,7 +218,9 @@ def del_pid_file():
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
plugins={
CPUAffinity(get_cpu_affinity(i)),
RMMSetup(rmm_pool_size, rmm_managed_memory, rmm_log_directory),
RMMSetup(
rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory,
),
},
name=name if nprocs == 1 or not name else name + "-" + str(i),
local_directory=local_directory,
Expand Down
30 changes: 24 additions & 6 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,18 @@ class LocalCUDACluster(LocalCluster):
.. warning::
Managed memory is currently incompatible with NVLink, trying to enable
both will result in an exception.
rmm_async: bool, default False
Initialize each worker withh RMM and set it to use RMM's asynchronous allocator.
See ``rmm.mr.CudaAsyncMemoryResource`` for more info.

.. note::
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory, trying to enable both will
result in an exception.
rmm_log_directory: str
Directory to write per-worker RMM log files to; the client and scheduler
are not logged here. Logging will only be enabled if ``rmm_pool_size`` or
``rmm_managed_memory`` are specified.
are not logged here. Logging will only be enabled if ``rmm_pool_size``,
``rmm_managed_memory``, or ``rmm_async`` are specified.
jit_unspill: bool
If ``True``, enable just-in-time unspilling. This is experimental and doesn't
support memory spilling to disk. Please see ``proxy_object.ProxyObject`` and
Expand All @@ -143,10 +151,12 @@ class LocalCUDACluster(LocalCluster):
If ``enable_infiniband`` or ``enable_nvlink`` is ``True`` and protocol is not
``"ucx"``.
ValueError
If ``ucx_net_devices`` is an empty string, or if it is ``"auto"`` and UCX-Py is
not installed, or if it is ``"auto"`` and ``enable_infiniband=False``, or UCX-Py
wasn't compiled with hwloc support, or both RMM managed memory and
NVLink are enabled.
If ``ucx_net_devices=""``, if NVLink and RMM managed memory are
both enabled, if RMM pools / managed memory and asynchronous allocator are both
enabled, or if ``ucx_net_devices="auto"`` and:

- UCX-Py is not installed or wasn't compiled with hwloc support; or
- ``enable_infiniband=False``

See Also
--------
Expand All @@ -169,6 +179,7 @@ def __init__(
ucx_net_devices=None,
rmm_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
jit_unspill=None,
log_spilling=False,
Expand Down Expand Up @@ -201,6 +212,7 @@ def __init__(

self.rmm_pool_size = rmm_pool_size
self.rmm_managed_memory = rmm_managed_memory
self.rmm_async = rmm_async
if rmm_pool_size is not None or rmm_managed_memory:
try:
import rmm # noqa F401
Expand All @@ -210,6 +222,11 @@ def __init__(
"is not available. For installation instructions, please "
"see https://github.com/rapidsai/rmm"
) # pragma: no cover
if rmm_async:
raise ValueError(
"RMM pool and managed memory are incompatible with asynchronous "
"allocator"
)
if self.rmm_pool_size is not None:
self.rmm_pool_size = parse_bytes(self.rmm_pool_size)
else:
Expand Down Expand Up @@ -332,6 +349,7 @@ def new_worker_spec(self):
RMMSetup(
self.rmm_pool_size,
self.rmm_managed_memory,
self.rmm_async,
self.rmm_log_directory,
),
},
Expand Down
32 changes: 32 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
from distributed.utils_test import loop # noqa: F401
from distributed.utils_test import popen

import rmm

from dask_cuda.utils import get_n_gpus, wait_workers

_driver_version = rmm._cuda.gpu.driverGetVersion()
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()


def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8"
Expand Down Expand Up @@ -101,6 +106,33 @@ def test_rmm_managed(loop): # noqa: F811
assert v is rmm.mr.ManagedMemoryResource


@pytest.mark.skipif(
((_driver_version, _runtime_version) < (11020, 11020)),
reason="cudaMallocAsync not supported",
)
def test_rmm_async(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-async",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_type = client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.CudaAsyncMemoryResource


def test_rmm_logging(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
Expand Down
22 changes: 22 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
from distributed.system import MEMORY_LIMIT
from distributed.utils_test import gen_test

import rmm

from dask_cuda import CUDAWorker, LocalCUDACluster, utils
from dask_cuda.initialize import initialize

_driver_version = rmm._cuda.gpu.driverGetVersion()
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()


@gen_test(timeout=20)
async def test_local_cuda_cluster():
Expand Down Expand Up @@ -154,6 +159,23 @@ async def test_rmm_managed():
assert v is rmm.mr.ManagedMemoryResource


@pytest.mark.skipif(
((_driver_version, _runtime_version) < (11020, 11020)),
reason="cudaMallocAsync not supported",
)
@gen_test(timeout=20)
async def test_rmm_async():
rmm = pytest.importorskip("rmm")

async with LocalCUDACluster(rmm_async=True, asynchronous=True,) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.CudaAsyncMemoryResource


@gen_test(timeout=20)
async def test_rmm_logging():
rmm = pytest.importorskip("rmm")
Expand Down
15 changes: 13 additions & 2 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,25 @@ def setup(self, worker=None):


class RMMSetup:
def __init__(self, nbytes, managed_memory, log_directory):
def __init__(self, nbytes, managed_memory, async_alloc, log_directory):
self.nbytes = nbytes
self.managed_memory = managed_memory
self.async_alloc = async_alloc
self.logging = log_directory is not None
self.log_directory = log_directory

def setup(self, worker=None):
if self.nbytes is not None or self.managed_memory is True:
if self.async_alloc:
import rmm

rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource())
if self.logging:
rmm.enable_logging(
log_file_name=get_rmm_log_file_name(
worker, self.logging, self.log_directory
)
)
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
elif self.nbytes is not None or self.managed_memory:
import rmm

pool_allocator = False if self.nbytes is None else True
Expand Down