Skip to content

Commit

Permalink
Extend RMM async allocation support (#1116)
Browse files Browse the repository at this point in the history
Ensure pool size argument is respected when enabling RMM async allocator, add release threshold support.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #1116
  • Loading branch information
pentschev authored Feb 28, 2023
1 parent b9561cf commit 3fadc30
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 31 deletions.
1 change: 1 addition & 0 deletions dask_cuda/benchmarks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def run(client: Client, args: Namespace, config: Config):
args.disable_rmm_pool,
args.enable_rmm_async,
args.enable_rmm_managed,
args.rmm_release_threshold,
args.rmm_log_directory,
args.enable_rmm_statistics,
)
Expand Down
19 changes: 18 additions & 1 deletion dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
action="store_true",
help="Enable RMM async memory allocator (implies --disable-rmm-pool)",
)
cluster_args.add_argument(
"--rmm-release-threshold",
default=None,
type=parse_bytes,
help="When --enable-rmm-async is set and the pool size grows beyond this "
"value, unused memory held by the pool will be released at the next "
"synchronization point. Can be an integer (bytes), or a string string (like "
"'4GB' or '5000M'). By default, this feature is disabled.",
)
cluster_args.add_argument(
"--rmm-log-directory",
default=None,
Expand Down Expand Up @@ -358,6 +367,7 @@ def setup_memory_pool(
disable_pool=False,
rmm_async=False,
rmm_managed=False,
release_threshold=None,
log_directory=None,
statistics=False,
):
Expand All @@ -371,7 +381,11 @@ def setup_memory_pool(
logging = log_directory is not None

if rmm_async:
rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource())
rmm.mr.set_current_device_resource(
rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=pool_size, release_threshold=release_threshold
)
)
cupy.cuda.set_allocator(rmm.rmm_cupy_allocator)
else:
rmm.reinitialize(
Expand All @@ -395,6 +409,7 @@ def setup_memory_pools(
disable_pool,
rmm_async,
rmm_managed,
release_threshold,
log_directory,
statistics,
):
Expand All @@ -406,6 +421,7 @@ def setup_memory_pools(
disable_pool=disable_pool,
rmm_async=rmm_async,
rmm_managed=rmm_managed,
release_threshold=release_threshold,
log_directory=log_directory,
statistics=statistics,
)
Expand All @@ -417,6 +433,7 @@ def setup_memory_pools(
disable_pool=disable_pool,
rmm_async=rmm_async,
rmm_managed=rmm_managed,
release_threshold=release_threshold,
log_directory=log_directory,
statistics=statistics,
)
Expand Down
13 changes: 13 additions & 0 deletions dask_cuda/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ def cuda():
incompatible with RMM pools and managed memory, trying to enable both will
result in failure.""",
)
@click.option(
"--rmm-release-threshold",
default=None,
help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this value, unused
memory held by the pool will be released at the next synchronization point. Can be
an integer (bytes), float (fraction of total device memory), string (like ``"5GB"``
or ``"5000M"``) or ``None``. By default, this feature is disabled.
.. note::
This size is a per-worker configuration, and not cluster-wide.""",
)
@click.option(
"--rmm-log-directory",
default=None,
Expand Down Expand Up @@ -312,6 +323,7 @@ def worker(
rmm_maximum_pool_size,
rmm_managed_memory,
rmm_async,
rmm_release_threshold,
rmm_log_directory,
rmm_track_allocations,
pid_file,
Expand Down Expand Up @@ -383,6 +395,7 @@ def worker(
rmm_maximum_pool_size,
rmm_managed_memory,
rmm_async,
rmm_release_threshold,
rmm_log_directory,
rmm_track_allocations,
pid_file,
Expand Down
20 changes: 8 additions & 12 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_release_threshold=None,
rmm_log_directory=None,
rmm_track_allocations=False,
pid_file=None,
Expand Down Expand Up @@ -138,12 +139,6 @@ def del_pid_file():
"For installation instructions, please see "
"https://github.com/rapidsai/rmm"
) # pragma: no cover
if rmm_async:
raise ValueError(
"RMM pool and managed memory are incompatible with asynchronous "
"allocator"
)

else:
if enable_nvlink:
warnings.warn(
Expand Down Expand Up @@ -215,12 +210,13 @@ def del_pid_file():
get_cpu_affinity(nvml_device_index(i, cuda_visible_devices(i)))
),
RMMSetup(
rmm_pool_size,
rmm_maximum_pool_size,
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
initial_pool_size=rmm_pool_size,
maximum_pool_size=rmm_maximum_pool_size,
managed_memory=rmm_managed_memory,
async_alloc=rmm_async,
release_threshold=rmm_release_threshold,
log_directory=rmm_log_directory,
track_allocations=rmm_track_allocations,
),
PreImport(pre_import),
},
Expand Down
38 changes: 24 additions & 14 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ class LocalCUDACluster(LocalCluster):
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory. Trying to enable both will
result in an exception.
rmm_release_threshold: int, str or None, default None
When ``rmm.async is True`` and the pool size grows beyond this value, unused
memory held by the pool will be released at the next synchronization point.
Can be an integer (bytes), float (fraction of total device memory), string (like
``"5GB"`` or ``"5000M"``) or ``None``. By default, this feature is disabled.
.. note::
This size is a per-worker configuration, and not cluster-wide.
rmm_log_directory : str or None, default None
Directory to write per-worker RMM log files to. The client and scheduler are not
logged here. Can be a string (like ``"/path/to/logs/"``) or ``None`` to
Expand Down Expand Up @@ -178,8 +186,12 @@ class LocalCUDACluster(LocalCluster):
TypeError
If InfiniBand or NVLink are enabled and ``protocol!="ucx"``.
ValueError
If NVLink and RMM managed memory are both enabled, or if RMM pools / managed
memory and asynchronous allocator are both enabled.
If RMM pool, RMM managed memory or RMM async allocator are requested but RMM
cannot be imported.
If RMM managed memory and asynchronous allocator are both enabled.
If RMM maximum pool size is set but RMM pool size is not.
If RMM maximum pool size is set but RMM async allocator is used.
If RMM release threshold is set but the RMM async allocator is not being used.
See Also
--------
Expand All @@ -205,6 +217,7 @@ def __init__(
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
rmm_release_threshold=None,
rmm_log_directory=None,
rmm_track_allocations=False,
jit_unspill=None,
Expand Down Expand Up @@ -247,7 +260,8 @@ def __init__(
self.rmm_maximum_pool_size = rmm_maximum_pool_size
self.rmm_managed_memory = rmm_managed_memory
self.rmm_async = rmm_async
if rmm_pool_size is not None or rmm_managed_memory:
self.rmm_release_threshold = rmm_release_threshold
if rmm_pool_size is not None or rmm_managed_memory or rmm_async:
try:
import rmm # noqa F401
except ImportError:
Expand All @@ -256,11 +270,6 @@ def __init__(
"is not available. For installation instructions, please "
"see https://github.com/rapidsai/rmm"
) # pragma: no cover
if rmm_async:
raise ValueError(
"RMM pool and managed memory are incompatible with asynchronous "
"allocator"
)
else:
if enable_nvlink:
warnings.warn(
Expand Down Expand Up @@ -385,12 +394,13 @@ def new_worker_spec(self):
get_cpu_affinity(nvml_device_index(0, visible_devices))
),
RMMSetup(
self.rmm_pool_size,
self.rmm_maximum_pool_size,
self.rmm_managed_memory,
self.rmm_async,
self.rmm_log_directory,
self.rmm_track_allocations,
initial_pool_size=self.rmm_pool_size,
maximum_pool_size=self.rmm_maximum_pool_size,
managed_memory=self.rmm_managed_memory,
async_alloc=self.rmm_async,
release_threshold=self.rmm_release_threshold,
log_directory=self.rmm_log_directory,
track_allocations=self.rmm_track_allocations,
),
PreImport(self.pre_import),
},
Expand Down
9 changes: 9 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def test_rmm_async(loop): # noqa: F811
"--host",
"127.0.0.1",
"--rmm-async",
"--rmm-pool-size",
"2 GB",
"--rmm-release-threshold",
"3 GB",
"--no-dashboard",
]
):
Expand All @@ -143,6 +147,11 @@ def test_rmm_async(loop): # noqa: F811
for v in memory_resource_type.values():
assert v is rmm.mr.CudaAsyncMemoryResource

ret = get_cluster_configuration(client)
wait(ret)
assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000
assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000


def test_rmm_logging(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
Expand Down
6 changes: 6 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ async def test_rmm_async():

async with LocalCUDACluster(
rmm_async=True,
rmm_pool_size="2GB",
rmm_release_threshold="3GB",
asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
Expand All @@ -240,6 +242,10 @@ async def test_rmm_async():
for v in memory_resource_type.values():
assert v is rmm.mr.CudaAsyncMemoryResource

ret = await get_cluster_configuration(client)
assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000
assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000


@gen_test(timeout=20)
async def test_rmm_logging():
Expand Down
32 changes: 28 additions & 4 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
maximum_pool_size,
managed_memory,
async_alloc,
release_threshold,
log_directory,
track_allocations,
):
Expand All @@ -54,20 +55,46 @@ def __init__(
"`rmm_maximum_pool_size` was specified without specifying "
"`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool."
)
if async_alloc is True and managed_memory is True:
raise ValueError(
"`rmm_managed_memory` is incompatible with the `rmm_async`."
)
if async_alloc is True and maximum_pool_size is not None:
raise ValueError(
"`rmm_maximum_pool_size` is incompatible with the `rmm_async`."
)
if async_alloc is False and release_threshold is not None:
raise ValueError("`rmm_release_threshold` requires `rmm_async`.")

self.initial_pool_size = initial_pool_size
self.maximum_pool_size = maximum_pool_size
self.managed_memory = managed_memory
self.async_alloc = async_alloc
self.release_threshold = release_threshold
self.logging = log_directory is not None
self.log_directory = log_directory
self.rmm_track_allocations = track_allocations

def setup(self, worker=None):
if self.initial_pool_size is not None:
self.initial_pool_size = parse_device_memory_limit(
self.initial_pool_size, alignment_size=256
)

if self.async_alloc:
import rmm

rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource())
if self.release_threshold is not None:
self.release_threshold = parse_device_memory_limit(
self.release_threshold, alignment_size=256
)

rmm.mr.set_current_device_resource(
rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=self.initial_pool_size,
release_threshold=self.release_threshold,
)
)
if self.logging:
rmm.enable_logging(
log_file_name=get_rmm_log_file_name(
Expand All @@ -80,9 +107,6 @@ def setup(self, worker=None):
pool_allocator = False if self.initial_pool_size is None else True

if self.initial_pool_size is not None:
self.initial_pool_size = parse_device_memory_limit(
self.initial_pool_size, alignment_size=256
)
if self.maximum_pool_size is not None:
self.maximum_pool_size = parse_device_memory_limit(
self.maximum_pool_size, alignment_size=256
Expand Down

0 comments on commit 3fadc30

Please sign in to comment.