diff --git a/ci/test_python.sh b/ci/test_python.sh index ef24c848..78330a40 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -99,6 +99,59 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --disable-rmm \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --disable-rmm-pool \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --rmm-pool-size 2GiB \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --rmm-pool-size 2GiB \ + --rmm-maximum-pool-size 4GiB \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --rmm-pool-size 2GiB \ + --rmm-maximum-pool-size 4GiB \ + --enable-rmm-async \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --rmm-pool-size 2GiB \ + --rmm-maximum-pool-size 4GiB \ + --enable-rmm-managed \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + rapids-logger "Run local benchmark (legacy dd)" DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index 1335334a..7f48d4fa 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -117,16 +117,18 @@ def run(client: Client, args: Namespace, config: Config): wait_for_cluster(client, shutdown_on_failure=True) assert len(client.scheduler_info()["workers"]) > 0 setup_memory_pools( - client, - args.type == "gpu", - args.rmm_pool_size, - args.disable_rmm_pool, - args.enable_rmm_async, - args.enable_rmm_managed, - args.rmm_release_threshold, - args.rmm_log_directory, - args.enable_rmm_statistics, - args.enable_rmm_track_allocations, + client=client, + is_gpu=args.type == "gpu", + disable_rmm=args.disable_rmm, + disable_rmm_pool=args.disable_rmm_pool, + pool_size=args.rmm_pool_size, + maximum_pool_size=args.rmm_maximum_pool_size, + rmm_async=args.enable_rmm_async, + rmm_managed=args.enable_rmm_managed, + release_threshold=args.rmm_release_threshold, + log_directory=args.rmm_log_directory, + statistics=args.enable_rmm_statistics, + rmm_track_allocations=args.enable_rmm_track_allocations, ) address_to_index, results, message_data = gather_bench_results(client, args, config) p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 5ac79a88..48e4755f 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -17,6 +17,7 @@ from distributed.comm.addressing import get_address_host from dask_cuda.local_cuda_cluster import LocalCUDACluster +from dask_cuda.utils import parse_device_memory_limit def as_noop(dsk): @@ -93,15 +94,41 @@ def parse_benchmark_args( "'forkserver' can be used to avoid issues with fork not being allowed " "after the networking stack has been initialised.", ) + cluster_args.add_argument( + "--disable-rmm", + action="store_true", + help="Disable RMM.", + ) + cluster_args.add_argument( + "--disable-rmm-pool", + action="store_true", + help="Uses RMM for allocations but without a memory pool.", + ) cluster_args.add_argument( "--rmm-pool-size", default=None, type=parse_bytes, help="The size of the RMM memory pool. Can be an integer (bytes) or a string " - "(like '4GB' or '5000M'). By default, 1/2 of the total GPU memory is used.", + "(like '4GB' or '5000M'). By default, 1/2 of the total GPU memory is used." + "" + ".. note::" + " This size is a per-worker configuration, and not cluster-wide.", ) cluster_args.add_argument( - "--disable-rmm-pool", action="store_true", help="Disable the RMM memory pool" + "--rmm-maximum-pool-size", + default=None, + help="When ``--rmm-pool-size`` is specified, this argument indicates the " + "maximum pool size. Can be an integer (bytes), or a string (like '4GB' or " + "'5000M'). By default, the total available memory on the GPU is used. " + "``rmm_pool_size`` must be specified to use RMM pool and to set the maximum " + "pool size." + "" + ".. note::" + " When paired with `--enable-rmm-async` the maximum size cannot be " + " guaranteed due to fragmentation." + "" + ".. note::" + " This size is a per-worker configuration, and not cluster-wide.", ) cluster_args.add_argument( "--enable-rmm-managed", @@ -407,10 +434,29 @@ def get_worker_device(): return -1 +def setup_rmm_resources(statistics=False, rmm_track_allocations=False): + import cupy + + import rmm + from rmm.allocators.cupy import rmm_cupy_allocator + + cupy.cuda.set_allocator(rmm_cupy_allocator) + if statistics: + rmm.mr.set_current_device_resource( + rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource()) + ) + if rmm_track_allocations: + rmm.mr.set_current_device_resource( + rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource()) + ) + + def setup_memory_pool( dask_worker=None, + disable_rmm=None, + disable_rmm_pool=None, pool_size=None, - disable_pool=False, + maximum_pool_size=None, rmm_async=False, rmm_managed=False, release_threshold=None, @@ -418,45 +464,66 @@ def setup_memory_pool( statistics=False, rmm_track_allocations=False, ): - import cupy - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator from dask_cuda.utils import get_rmm_log_file_name logging = log_directory is not None - if rmm_async: - rmm.mr.set_current_device_resource( - rmm.mr.CudaAsyncMemoryResource( - initial_pool_size=pool_size, release_threshold=release_threshold - ) - ) - else: - rmm.reinitialize( - pool_allocator=not disable_pool, - managed_memory=rmm_managed, - initial_pool_size=pool_size, - logging=logging, - log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory), - ) - cupy.cuda.set_allocator(rmm_cupy_allocator) - if statistics: - rmm.mr.set_current_device_resource( - rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource()) + if pool_size is not None: + pool_size = parse_device_memory_limit(pool_size, alignment_size=256) + + if maximum_pool_size is not None: + maximum_pool_size = parse_device_memory_limit( + maximum_pool_size, alignment_size=256 ) - if rmm_track_allocations: - rmm.mr.set_current_device_resource( - rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource()) + + if release_threshold is not None: + release_threshold = parse_device_memory_limit( + release_threshold, alignment_size=256 ) + if not disable_rmm: + if rmm_async: + mr = rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=pool_size, + release_threshold=release_threshold, + ) + + if maximum_pool_size is not None: + mr = rmm.mr.LimitingResourceAdaptor( + mr, allocation_limit=maximum_pool_size + ) + + rmm.mr.set_current_device_resource(mr) + + setup_rmm_resources( + statistics=statistics, rmm_track_allocations=rmm_track_allocations + ) + else: + rmm.reinitialize( + pool_allocator=not disable_rmm_pool, + managed_memory=rmm_managed, + initial_pool_size=pool_size, + maximum_pool_size=maximum_pool_size, + logging=logging, + log_file_name=get_rmm_log_file_name( + dask_worker, logging, log_directory + ), + ) + + setup_rmm_resources( + statistics=statistics, rmm_track_allocations=rmm_track_allocations + ) + def setup_memory_pools( client, is_gpu, + disable_rmm, + disable_rmm_pool, pool_size, - disable_pool, + maximum_pool_size, rmm_async, rmm_managed, release_threshold, @@ -468,8 +535,10 @@ def setup_memory_pools( return client.run( setup_memory_pool, + disable_rmm=disable_rmm, + disable_rmm_pool=disable_rmm_pool, pool_size=pool_size, - disable_pool=disable_pool, + maximum_pool_size=maximum_pool_size, rmm_async=rmm_async, rmm_managed=rmm_managed, release_threshold=release_threshold, @@ -482,7 +551,9 @@ def setup_memory_pools( client.run_on_scheduler( setup_memory_pool, pool_size=1e9, - disable_pool=disable_pool, + disable_rmm=disable_rmm, + disable_rmm_pool=disable_rmm_pool, + maximum_pool_size=maximum_pool_size, rmm_async=rmm_async, rmm_managed=rmm_managed, release_threshold=release_threshold, diff --git a/dask_cuda/cli.py b/dask_cuda/cli.py index cc2d0843..ba58fe3e 100644 --- a/dask_cuda/cli.py +++ b/dask_cuda/cli.py @@ -120,6 +120,10 @@ def cuda(): memory on the GPU is used. ``rmm_pool_size`` must be specified to use RMM pool and to set the maximum pool size. + .. note:: + When paired with `--enable-rmm-async` the maximum size cannot be guaranteed due + to fragmentation. + .. note:: This size is a per-worker configuration, and not cluster-wide.""", ) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 7a5c8c13..1b81c770 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -114,6 +114,10 @@ class LocalCUDACluster(LocalCluster): memory on the GPU is used. ``rmm_pool_size`` must be specified to use RMM pool and to set the maximum pool size. + .. note:: + When paired with `--enable-rmm-async` the maximum size cannot be guaranteed + due to fragmentation. + .. note:: This size is a per-worker configuration, and not cluster-wide. rmm_managed_memory : bool, default False