From e3e3a4e8c63312e7a73ce60d83c20a17ddb61d02 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Jan 2023 10:11:18 +0100 Subject: [PATCH 1/5] Use TrackingResourceAdaptor to get better debug info --- dask_cuda/local_cuda_cluster.py | 2 +- dask_cuda/proxify_host_file.py | 46 ++++++++++++++++++++--- dask_cuda/tests/test_proxify_host_file.py | 29 ++++++++------ 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 115c419cd..fa532b5f0 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -124,7 +124,7 @@ class LocalCUDACluster(LocalCluster): Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception. rmm_async: bool, default False - Initialize each worker withh RMM and set it to use RMM's asynchronous allocator. + Initialize each worker with RMM and set it to use RMM's asynchronous allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info. .. warning:: diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index f258776e5..88f7759d7 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -47,6 +47,38 @@ T = TypeVar("T") +def get_rmm_device_memory_usage() -> Optional[int]: + """Get current bytes allocated on current device through RMM + + Check the current RMM resource stack for resources such as + StatisticsResourceAdaptor and TrackingResourceAdaptor that + can report the current allocated bytes. Returns None, if + no such resources exist. + + Return + ------ + nbytes: int or None + Number of bytes allocated on device through RMM or None + """ + + def get_rmm_memory_resource_stack(mr) -> list: + if hasattr(mr, "upstream_mr"): + return [mr] + get_rmm_memory_resource_stack(mr.upstream_mr) + return [mr] + + try: + import rmm + except ImportError: + return None + + for mr in get_rmm_memory_resource_stack(rmm.mr.get_current_device_resource()): + if isinstance(mr, rmm.mr.TrackingResourceAdaptor): + return mr.get_allocated_bytes() + if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): + return mr.allocation_counts["current_bytes"] + return None + + class Proxies(abc.ABC): """Abstract base class to implement tracking of proxies @@ -591,12 +623,16 @@ def oom(nbytes: int) -> bool: traceback.print_stack(file=f) f.seek(0) tb = f.read() + + dev_mem = get_rmm_device_memory_usage() + dev_msg = "" + if dev_mem is not None: + dev_msg = f"RMM allocs: {format_bytes(dev_mem)}, " + self.logger.warning( - "RMM allocation of %s failed, spill-on-demand couldn't " - "find any device memory to spill:\n%s\ntraceback:\n%s\n", - format_bytes(nbytes), - self.manager.pprint(), - tb, + f"RMM allocation of {format_bytes(nbytes)} failed, " + "spill-on-demand couldn't find any device memory to " + f"spill.\n{dev_msg}{self.manager}, traceback:\n{tb}\n" ) # Since we didn't find anything to spill, we give up. return False diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 09b5c9b46..1babaa2c5 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -1,4 +1,3 @@ -import re from typing import Iterable from unittest.mock import patch @@ -10,6 +9,7 @@ import dask.dataframe from dask.dataframe.shuffle import shuffle_group from dask.sizeof import sizeof +from dask.utils import format_bytes from distributed import Client from distributed.utils_test import gen_test from distributed.worker import get_worker @@ -448,25 +448,32 @@ def test_on_demand_debug_info(): if not hasattr(rmm.mr, "FailureCallbackResourceAdaptor"): pytest.skip("RMM doesn't implement FailureCallbackResourceAdaptor") - total_mem = get_device_total_memory() + rmm_pool_size = 2**20 def task(): - rmm.DeviceBuffer(size=total_mem + 1) + ( + rmm.DeviceBuffer(size=rmm_pool_size // 2), + rmm.DeviceBuffer(size=rmm_pool_size // 2), + rmm.DeviceBuffer(size=rmm_pool_size), # Trigger OOM + ) - with dask_cuda.LocalCUDACluster(n_workers=1, jit_unspill=True) as cluster: + with dask_cuda.LocalCUDACluster( + n_workers=1, + jit_unspill=True, + rmm_pool_size=rmm_pool_size, + rmm_maximum_pool_size=rmm_pool_size, + rmm_track_allocations=True, + ) as cluster: with Client(cluster) as client: # Warmup, which trigger the initialization of spill on demand client.submit(range, 10).result() # Submit too large RMM buffer - with pytest.raises( - MemoryError, match=r".*std::bad_alloc:.*CUDA error at:.*" - ): + with pytest.raises(MemoryError, match="Maximum pool size exceeded"): client.submit(task).result() log = str(client.get_worker_logs()) - assert re.search( - "WARNING - RMM allocation of .* failed, spill-on-demand", log - ) - assert re.search(": Empty", log) + size = format_bytes(rmm_pool_size) + assert f"WARNING - RMM allocation of {size} failed" in log + assert f"RMM allocs: {size}" in log assert "traceback:" in log From c5b4e771fc9c3e89f11f09583d6d08bb1000706f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Jan 2023 11:33:08 +0100 Subject: [PATCH 2/5] bencmarks: adding --enable-rmm-statistics --- dask_cuda/benchmarks/common.py | 1 + dask_cuda/benchmarks/utils.py | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index 00aa31dcd..e734f882c 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -122,6 +122,7 @@ def run(client: Client, args: Namespace, config: Config): args.rmm_pool_size, args.disable_rmm_pool, args.rmm_log_directory, + args.enable_rmm_statistics, ) address_to_index, results, message_data = gather_bench_results(client, args, config) p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 8a8419cd3..66b21861f 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -105,6 +105,11 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] help="Directory to write worker and scheduler RMM log files to. " "Logging is only enabled if RMM memory pool is enabled.", ) + cluster_args.add_argument( + "--enable-rmm-statistics", + action="store_true", + help="Use RMM's StatisticsResourceAdaptor to gather allocation statistics", + ) cluster_args.add_argument( "--enable-tcp-over-ucx", default=None, @@ -340,6 +345,7 @@ def setup_memory_pool( pool_size=None, disable_pool=False, log_directory=None, + statistics=False, ): import cupy @@ -358,9 +364,15 @@ def setup_memory_pool( log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory), ) cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) + if statistics: + rmm.mr.set_current_device_resource( + rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource()) + ) -def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): +def setup_memory_pools( + client, is_gpu, pool_size, disable_pool, log_directory, statistics +): if not is_gpu: return client.run( @@ -368,6 +380,7 @@ def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): pool_size=pool_size, disable_pool=disable_pool, log_directory=log_directory, + statistics=statistics, ) # Create an RMM pool on the scheduler due to occasional deserialization # of CUDA objects. May cause issues with InfiniBand otherwise. @@ -376,6 +389,7 @@ def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): pool_size=1e9, disable_pool=disable_pool, log_directory=log_directory, + statistics=statistics, ) From 1c4bb53b5f8fb0687691a381e613a43d13a41133 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Jan 2023 16:29:33 +0100 Subject: [PATCH 3/5] doc --- dask_cuda/proxify_host_file.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 88f7759d7..ff6c269e7 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -51,9 +51,9 @@ def get_rmm_device_memory_usage() -> Optional[int]: """Get current bytes allocated on current device through RMM Check the current RMM resource stack for resources such as - StatisticsResourceAdaptor and TrackingResourceAdaptor that - can report the current allocated bytes. Returns None, if - no such resources exist. + `StatisticsResourceAdaptor` and `TrackingResourceAdaptor` + that can report the current allocated bytes. Returns None, + if no such resources exist. Return ------ From 7dea2bd43743fc0155712c38151dd1bcddb28b09 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Jan 2023 16:32:42 +0100 Subject: [PATCH 4/5] moved get_rmm_device_memory_usage() to utils.py --- dask_cuda/proxify_host_file.py | 33 +----------------------------- dask_cuda/utils.py | 37 ++++++++++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index ff6c269e7..47bb3952a 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -43,42 +43,11 @@ from .is_spillable_object import cudf_spilling_status from .proxify_device_objects import proxify_device_objects, unproxify_device_objects from .proxy_object import ProxyObject +from .utils import get_rmm_device_memory_usage T = TypeVar("T") -def get_rmm_device_memory_usage() -> Optional[int]: - """Get current bytes allocated on current device through RMM - - Check the current RMM resource stack for resources such as - `StatisticsResourceAdaptor` and `TrackingResourceAdaptor` - that can report the current allocated bytes. Returns None, - if no such resources exist. - - Return - ------ - nbytes: int or None - Number of bytes allocated on device through RMM or None - """ - - def get_rmm_memory_resource_stack(mr) -> list: - if hasattr(mr, "upstream_mr"): - return [mr] + get_rmm_memory_resource_stack(mr.upstream_mr) - return [mr] - - try: - import rmm - except ImportError: - return None - - for mr in get_rmm_memory_resource_stack(rmm.mr.get_current_device_resource()): - if isinstance(mr, rmm.mr.TrackingResourceAdaptor): - return mr.get_allocated_bytes() - if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): - return mr.allocation_counts["current_bytes"] - return None - - class Proxies(abc.ABC): """Abstract base class to implement tracking of proxies diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index a60c05e78..850006eac 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -7,6 +7,7 @@ from contextlib import suppress from functools import singledispatch from multiprocessing import cpu_count +from typing import Optional import numpy as np import pynvml @@ -19,8 +20,6 @@ from distributed import Worker, wait from distributed.comm import parse_address -from .proxify_host_file import ProxifyHostFile - try: from nvtx import annotate as nvtx_annotate except ImportError: @@ -681,6 +680,8 @@ def get_gpu_uuid_from_index(device_index=0): def get_worker_config(dask_worker): + from .proxify_host_file import ProxifyHostFile + # assume homogenous cluster plugin_vals = dask_worker.plugins.values() ret = {} @@ -822,3 +823,35 @@ def get_cluster_configuration(client): _get_cluster_configuration, client=client, asynchronous=client.asynchronous ) return data + + +def get_rmm_device_memory_usage() -> Optional[int]: + """Get current bytes allocated on current device through RMM + + Check the current RMM resource stack for resources such as + `StatisticsResourceAdaptor` and `TrackingResourceAdaptor` + that can report the current allocated bytes. Returns None, + if no such resources exist. + + Return + ------ + nbytes: int or None + Number of bytes allocated on device through RMM or None + """ + + def get_rmm_memory_resource_stack(mr) -> list: + if hasattr(mr, "upstream_mr"): + return [mr] + get_rmm_memory_resource_stack(mr.upstream_mr) + return [mr] + + try: + import rmm + except ImportError: + return None + + for mr in get_rmm_memory_resource_stack(rmm.mr.get_current_device_resource()): + if isinstance(mr, rmm.mr.TrackingResourceAdaptor): + return mr.get_allocated_bytes() + if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): + return mr.allocation_counts["current_bytes"] + return None From d70d9a01d5215add3fbc60efe33e576e2f9fc213 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 11 Jan 2023 16:41:19 +0100 Subject: [PATCH 5/5] doc --- dask_cuda/benchmarks/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 66b21861f..28d43cc13 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -108,7 +108,9 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] cluster_args.add_argument( "--enable-rmm-statistics", action="store_true", - help="Use RMM's StatisticsResourceAdaptor to gather allocation statistics", + help="Use RMM's StatisticsResourceAdaptor to gather allocation statistics. " + "This enables spilling implementations such as JIT-Unspill to provides more " + "information on out-of-memory errors", ) cluster_args.add_argument( "--enable-tcp-over-ucx",