Skip to content

Commit

Permalink
Add option to track RMM allocations (#842)
Browse files Browse the repository at this point in the history
Adds the `rmm_track_allocations` option that enables workers to query the amount of RMM memory allocated at any time via `mr.get_allocated_bytes()`.

This is used in dask/distributed#5740.

Authors:
  - Ashwin Srinath (https://github.com/shwina)
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #842
  • Loading branch information
shwina authored Feb 25, 2022
1 parent 06decc6 commit f09c7c5
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 0 deletions.
10 changes: 10 additions & 0 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@
Logging will only be enabled if ``--rmm-pool-size`` or ``--rmm-managed-memory``
are specified.""",
)
@click.option(
"--rmm-track-allocations/--no-rmm-track-allocations",
default=False,
show_default=True,
help="""Track memory allocations made by RMM. If ``True``, wraps the memory
resource of each worker with a ``rmm.mr.TrackingResourceAdaptor`` that
allows querying the amount of memory allocated by RMM.""",
)
@click.option(
"--pid-file", type=str, default="", help="File to write the process PID.",
)
Expand Down Expand Up @@ -281,6 +289,7 @@ def main(
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
pid_file,
resources,
dashboard,
Expand Down Expand Up @@ -332,6 +341,7 @@ def main(
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
pid_file,
resources,
dashboard,
Expand Down
2 changes: 2 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
rmm_track_allocations=False,
pid_file=None,
resources=None,
dashboard=True,
Expand Down Expand Up @@ -223,6 +224,7 @@ def del_pid_file():
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
),
PreImport(pre_import),
},
Expand Down
13 changes: 13 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ class LocalCUDACluster(LocalCluster):
.. note::
Logging will only be enabled if ``rmm_pool_size`` is specified or
``rmm_managed_memory=True``.
rmm_track_allocations : bool, default False
If True, wraps the memory resource used by each worker with a
``rmm.mr.TrackingResourceAdaptor``, which tracks the amount of
memory allocated.
.. note::
This option enables additional diagnostics to be collected and
reported by the Dask dashboard. However, there is significant overhead
associated with this and it should only be used for debugging and
memory profiling.
jit_unspill : bool or None, default None
Enable just-in-time unspilling. Can be a boolean or ``None`` to fall back on
the value of ``dask.jit-unspill`` in the local Dask configuration, disabling
Expand Down Expand Up @@ -195,6 +205,7 @@ def __init__(
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
rmm_track_allocations=False,
jit_unspill=None,
log_spilling=False,
worker_class=None,
Expand Down Expand Up @@ -258,6 +269,7 @@ def __init__(
)

self.rmm_log_directory = rmm_log_directory
self.rmm_track_allocations = rmm_track_allocations

if not kwargs.pop("processes", True):
raise ValueError(
Expand Down Expand Up @@ -377,6 +389,7 @@ def new_worker_spec(self):
self.rmm_managed_memory,
self.rmm_async,
self.rmm_log_directory,
self.rmm_track_allocations,
),
PreImport(self.pre_import),
},
Expand Down
31 changes: 31 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,34 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811

result = client.run(lambda: os.environ["CUDA_VISIBLE_DEVICES"])
assert list(result.values())[0] == gpu_uuid


def test_rmm_track_allocations(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-pool-size",
"2 GB",
"--no-dashboard",
"--rmm-track-allocations",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_type = client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.TrackingResourceAdaptor

memory_resource_upstream_type = client.run(
lambda: type(rmm.mr.get_current_device_resource().upstream_mr)
)
for v in memory_resource_upstream_type.values():
assert v is rmm.mr.PoolMemoryResource
20 changes: 20 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,23 @@ async def test_gpu_uuid():

result = await client.run(lambda: os.environ["CUDA_VISIBLE_DEVICES"])
assert list(result.values())[0] == gpu_uuid


@gen_test(timeout=20)
async def test_rmm_track_allocations():
rmm = pytest.importorskip("rmm")
async with LocalCUDACluster(
rmm_pool_size="2GB", asynchronous=True, rmm_track_allocations=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.TrackingResourceAdaptor

memory_resource_upstream_type = await client.run(
lambda: type(rmm.mr.get_current_device_resource().upstream_mr)
)
for v in memory_resource_upstream_type.values():
assert v is rmm.mr.PoolMemoryResource
7 changes: 7 additions & 0 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
managed_memory,
async_alloc,
log_directory,
track_allocations,
):
if initial_pool_size is None and maximum_pool_size is not None:
raise ValueError(
Expand All @@ -56,6 +57,7 @@ def __init__(
self.async_alloc = async_alloc
self.logging = log_directory is not None
self.log_directory = log_directory
self.rmm_track_allocations = track_allocations

def setup(self, worker=None):
if self.async_alloc:
Expand Down Expand Up @@ -83,6 +85,11 @@ def setup(self, worker=None):
worker, self.logging, self.log_directory
),
)
if self.rmm_track_allocations:
import rmm

mr = rmm.mr.get_current_device_resource()
rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr))


class PreImport:
Expand Down

0 comments on commit f09c7c5

Please sign in to comment.