diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index 0b417e7b3..c7e0cb833 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -123,6 +123,7 @@ def run(client: Client, args: Namespace, config: Config): args.disable_rmm_pool, args.enable_rmm_async, args.enable_rmm_managed, + args.rmm_release_threshold, args.rmm_log_directory, args.enable_rmm_statistics, ) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index a7f51ce9b..1de8868e4 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -108,6 +108,15 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] action="store_true", help="Enable RMM async memory allocator (implies --disable-rmm-pool)", ) + cluster_args.add_argument( + "--rmm-release-threshold", + default=None, + type=parse_bytes, + help="When --enable-rmm-async is set and the pool size grows beyond this " + "value, unused memory held by the pool will be released at the next " + "synchronization point. Can be an integer (bytes), or a string string (like " + "'4GB' or '5000M'). By default, this feature is disabled.", + ) cluster_args.add_argument( "--rmm-log-directory", default=None, @@ -358,6 +367,7 @@ def setup_memory_pool( disable_pool=False, rmm_async=False, rmm_managed=False, + release_threshold=None, log_directory=None, statistics=False, ): @@ -371,7 +381,11 @@ def setup_memory_pool( logging = log_directory is not None if rmm_async: - rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + rmm.mr.set_current_device_resource( + rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=pool_size, release_threshold=release_threshold + ) + ) cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) else: rmm.reinitialize( @@ -395,6 +409,7 @@ def setup_memory_pools( disable_pool, rmm_async, rmm_managed, + release_threshold, log_directory, statistics, ): @@ -406,6 +421,7 @@ def setup_memory_pools( disable_pool=disable_pool, rmm_async=rmm_async, rmm_managed=rmm_managed, + release_threshold=release_threshold, log_directory=log_directory, statistics=statistics, ) @@ -417,6 +433,7 @@ def setup_memory_pools( disable_pool=disable_pool, rmm_async=rmm_async, rmm_managed=rmm_managed, + release_threshold=release_threshold, log_directory=log_directory, statistics=statistics, ) diff --git a/dask_cuda/cli.py b/dask_cuda/cli.py index b7069d632..5a6e3db07 100644 --- a/dask_cuda/cli.py +++ b/dask_cuda/cli.py @@ -145,6 +145,17 @@ def cuda(): incompatible with RMM pools and managed memory, trying to enable both will result in failure.""", ) +@click.option( + "--rmm-release-threshold", + default=None, + help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this value, unused + memory held by the pool will be released at the next synchronization point. Can be + an integer (bytes), float (fraction of total device memory), string (like ``"5GB"`` + or ``"5000M"``) or ``None``. By default, this feature is disabled. + + .. note:: + This size is a per-worker configuration, and not cluster-wide.""", +) @click.option( "--rmm-log-directory", default=None, @@ -312,6 +323,7 @@ def worker( rmm_maximum_pool_size, rmm_managed_memory, rmm_async, + rmm_release_threshold, rmm_log_directory, rmm_track_allocations, pid_file, @@ -383,6 +395,7 @@ def worker( rmm_maximum_pool_size, rmm_managed_memory, rmm_async, + rmm_release_threshold, rmm_log_directory, rmm_track_allocations, pid_file, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 03b16b529..f12ad6780 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -47,6 +47,7 @@ def __init__( rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, + rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, pid_file=None, @@ -138,12 +139,6 @@ def del_pid_file(): "For installation instructions, please see " "https://github.com/rapidsai/rmm" ) # pragma: no cover - if rmm_async: - raise ValueError( - "RMM pool and managed memory are incompatible with asynchronous " - "allocator" - ) - else: if enable_nvlink: warnings.warn( @@ -215,12 +210,13 @@ def del_pid_file(): get_cpu_affinity(nvml_device_index(i, cuda_visible_devices(i))) ), RMMSetup( - rmm_pool_size, - rmm_maximum_pool_size, - rmm_managed_memory, - rmm_async, - rmm_log_directory, - rmm_track_allocations, + initial_pool_size=rmm_pool_size, + maximum_pool_size=rmm_maximum_pool_size, + managed_memory=rmm_managed_memory, + async_alloc=rmm_async, + release_threshold=rmm_release_threshold, + log_directory=rmm_log_directory, + track_allocations=rmm_track_allocations, ), PreImport(pre_import), }, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index fa532b5f0..656f6140d 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -131,6 +131,14 @@ class LocalCUDACluster(LocalCluster): The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception. + rmm_release_threshold: int, str or None, default None + When ``rmm.async is True`` and the pool size grows beyond this value, unused + memory held by the pool will be released at the next synchronization point. + Can be an integer (bytes), float (fraction of total device memory), string (like + ``"5GB"`` or ``"5000M"``) or ``None``. By default, this feature is disabled. + + .. note:: + This size is a per-worker configuration, and not cluster-wide. rmm_log_directory : str or None, default None Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like ``"/path/to/logs/"``) or ``None`` to @@ -178,8 +186,12 @@ class LocalCUDACluster(LocalCluster): TypeError If InfiniBand or NVLink are enabled and ``protocol!="ucx"``. ValueError - If NVLink and RMM managed memory are both enabled, or if RMM pools / managed - memory and asynchronous allocator are both enabled. + If RMM pool, RMM managed memory or RMM async allocator are requested but RMM + cannot be imported. + If RMM managed memory and asynchronous allocator are both enabled. + If RMM maximum pool size is set but RMM pool size is not. + If RMM maximum pool size is set but RMM async allocator is used. + If RMM release threshold is set but the RMM async allocator is not being used. See Also -------- @@ -205,6 +217,7 @@ def __init__( rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, + rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, @@ -247,7 +260,8 @@ def __init__( self.rmm_maximum_pool_size = rmm_maximum_pool_size self.rmm_managed_memory = rmm_managed_memory self.rmm_async = rmm_async - if rmm_pool_size is not None or rmm_managed_memory: + self.rmm_release_threshold = rmm_release_threshold + if rmm_pool_size is not None or rmm_managed_memory or rmm_async: try: import rmm # noqa F401 except ImportError: @@ -256,11 +270,6 @@ def __init__( "is not available. For installation instructions, please " "see https://github.com/rapidsai/rmm" ) # pragma: no cover - if rmm_async: - raise ValueError( - "RMM pool and managed memory are incompatible with asynchronous " - "allocator" - ) else: if enable_nvlink: warnings.warn( @@ -385,12 +394,13 @@ def new_worker_spec(self): get_cpu_affinity(nvml_device_index(0, visible_devices)) ), RMMSetup( - self.rmm_pool_size, - self.rmm_maximum_pool_size, - self.rmm_managed_memory, - self.rmm_async, - self.rmm_log_directory, - self.rmm_track_allocations, + initial_pool_size=self.rmm_pool_size, + maximum_pool_size=self.rmm_maximum_pool_size, + managed_memory=self.rmm_managed_memory, + async_alloc=self.rmm_async, + release_threshold=self.rmm_release_threshold, + log_directory=self.rmm_log_directory, + track_allocations=self.rmm_track_allocations, ), PreImport(self.pre_import), }, diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 64950e2b6..9f5d82d9d 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -131,6 +131,10 @@ def test_rmm_async(loop): # noqa: F811 "--host", "127.0.0.1", "--rmm-async", + "--rmm-pool-size", + "2 GB", + "--rmm-release-threshold", + "3 GB", "--no-dashboard", ] ): @@ -143,6 +147,11 @@ def test_rmm_async(loop): # noqa: F811 for v in memory_resource_type.values(): assert v is rmm.mr.CudaAsyncMemoryResource + ret = get_cluster_configuration(client) + wait(ret) + assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000 + assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 + def test_rmm_logging(loop): # noqa: F811 rmm = pytest.importorskip("rmm") diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index b0ac88234..987055636 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -231,6 +231,8 @@ async def test_rmm_async(): async with LocalCUDACluster( rmm_async=True, + rmm_pool_size="2GB", + rmm_release_threshold="3GB", asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -240,6 +242,10 @@ async def test_rmm_async(): for v in memory_resource_type.values(): assert v is rmm.mr.CudaAsyncMemoryResource + ret = await get_cluster_configuration(client) + assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000 + assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 + @gen_test(timeout=20) async def test_rmm_logging(): diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 5e558fbc5..468c37f47 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -46,6 +46,7 @@ def __init__( maximum_pool_size, managed_memory, async_alloc, + release_threshold, log_directory, track_allocations, ): @@ -54,20 +55,46 @@ def __init__( "`rmm_maximum_pool_size` was specified without specifying " "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." ) + if async_alloc is True and managed_memory is True: + raise ValueError( + "`rmm_managed_memory` is incompatible with the `rmm_async`." + ) + if async_alloc is True and maximum_pool_size is not None: + raise ValueError( + "`rmm_maximum_pool_size` is incompatible with the `rmm_async`." + ) + if async_alloc is False and release_threshold is not None: + raise ValueError("`rmm_release_threshold` requires `rmm_async`.") self.initial_pool_size = initial_pool_size self.maximum_pool_size = maximum_pool_size self.managed_memory = managed_memory self.async_alloc = async_alloc + self.release_threshold = release_threshold self.logging = log_directory is not None self.log_directory = log_directory self.rmm_track_allocations = track_allocations def setup(self, worker=None): + if self.initial_pool_size is not None: + self.initial_pool_size = parse_device_memory_limit( + self.initial_pool_size, alignment_size=256 + ) + if self.async_alloc: import rmm - rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + if self.release_threshold is not None: + self.release_threshold = parse_device_memory_limit( + self.release_threshold, alignment_size=256 + ) + + rmm.mr.set_current_device_resource( + rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=self.initial_pool_size, + release_threshold=self.release_threshold, + ) + ) if self.logging: rmm.enable_logging( log_file_name=get_rmm_log_file_name( @@ -80,9 +107,6 @@ def setup(self, worker=None): pool_allocator = False if self.initial_pool_size is None else True if self.initial_pool_size is not None: - self.initial_pool_size = parse_device_memory_limit( - self.initial_pool_size, alignment_size=256 - ) if self.maximum_pool_size is not None: self.maximum_pool_size = parse_device_memory_limit( self.maximum_pool_size, alignment_size=256