Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling RMM in benchmarks #1352

Merged
merged 5 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,59 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm-pool \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
--enable-rmm-async \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
--enable-rmm-managed \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
Expand Down
22 changes: 12 additions & 10 deletions dask_cuda/benchmarks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,18 @@ def run(client: Client, args: Namespace, config: Config):
wait_for_cluster(client, shutdown_on_failure=True)
assert len(client.scheduler_info()["workers"]) > 0
setup_memory_pools(
client,
args.type == "gpu",
args.rmm_pool_size,
args.disable_rmm_pool,
args.enable_rmm_async,
args.enable_rmm_managed,
args.rmm_release_threshold,
args.rmm_log_directory,
args.enable_rmm_statistics,
args.enable_rmm_track_allocations,
client=client,
is_gpu=args.type == "gpu",
disable_rmm=args.disable_rmm,
disable_rmm_pool=args.disable_rmm_pool,
pool_size=args.rmm_pool_size,
maximum_pool_size=args.rmm_maximum_pool_size,
rmm_async=args.enable_rmm_async,
rmm_managed=args.enable_rmm_managed,
release_threshold=args.rmm_release_threshold,
log_directory=args.rmm_log_directory,
statistics=args.enable_rmm_statistics,
rmm_track_allocations=args.enable_rmm_track_allocations,
)
address_to_index, results, message_data = gather_bench_results(client, args, config)
p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index)
Expand Down
131 changes: 101 additions & 30 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from distributed.comm.addressing import get_address_host

from dask_cuda.local_cuda_cluster import LocalCUDACluster
from dask_cuda.utils import parse_device_memory_limit


def as_noop(dsk):
Expand Down Expand Up @@ -93,15 +94,41 @@ def parse_benchmark_args(
"'forkserver' can be used to avoid issues with fork not being allowed "
"after the networking stack has been initialised.",
)
cluster_args.add_argument(
"--disable-rmm",
action="store_true",
help="Disable RMM.",
)
cluster_args.add_argument(
"--disable-rmm-pool",
action="store_true",
help="Uses RMM for allocations but without a memory pool.",
)
cluster_args.add_argument(
"--rmm-pool-size",
default=None,
type=parse_bytes,
help="The size of the RMM memory pool. Can be an integer (bytes) or a string "
"(like '4GB' or '5000M'). By default, 1/2 of the total GPU memory is used.",
"(like '4GB' or '5000M'). By default, 1/2 of the total GPU memory is used."
""
".. note::"
" This size is a per-worker configuration, and not cluster-wide.",
)
cluster_args.add_argument(
"--disable-rmm-pool", action="store_true", help="Disable the RMM memory pool"
"--rmm-maximum-pool-size",
default=None,
help="When ``--rmm-pool-size`` is specified, this argument indicates the "
"maximum pool size. Can be an integer (bytes), or a string (like '4GB' or "
"'5000M'). By default, the total available memory on the GPU is used. "
"``rmm_pool_size`` must be specified to use RMM pool and to set the maximum "
"pool size."
""
".. note::"
" When paired with `--enable-rmm-async` the maximum size cannot be "
" guaranteed due to fragmentation."
""
".. note::"
" This size is a per-worker configuration, and not cluster-wide.",
)
cluster_args.add_argument(
"--enable-rmm-managed",
Expand Down Expand Up @@ -407,56 +434,96 @@ def get_worker_device():
return -1


def setup_rmm_resources(statistics=False, rmm_track_allocations=False):
import cupy

import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

cupy.cuda.set_allocator(rmm_cupy_allocator)
if statistics:
rmm.mr.set_current_device_resource(
rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource())
)
if rmm_track_allocations:
rmm.mr.set_current_device_resource(
rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource())
)


def setup_memory_pool(
dask_worker=None,
disable_rmm=None,
disable_rmm_pool=None,
pool_size=None,
disable_pool=False,
maximum_pool_size=None,
rmm_async=False,
rmm_managed=False,
release_threshold=None,
log_directory=None,
statistics=False,
rmm_track_allocations=False,
):
import cupy

import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

from dask_cuda.utils import get_rmm_log_file_name

logging = log_directory is not None

if rmm_async:
rmm.mr.set_current_device_resource(
rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=pool_size, release_threshold=release_threshold
)
)
else:
rmm.reinitialize(
pool_allocator=not disable_pool,
managed_memory=rmm_managed,
initial_pool_size=pool_size,
logging=logging,
log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory),
)
cupy.cuda.set_allocator(rmm_cupy_allocator)
if statistics:
rmm.mr.set_current_device_resource(
rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource())
if pool_size is not None:
pool_size = parse_device_memory_limit(pool_size, alignment_size=256)

if maximum_pool_size is not None:
maximum_pool_size = parse_device_memory_limit(
maximum_pool_size, alignment_size=256
)
if rmm_track_allocations:
rmm.mr.set_current_device_resource(
rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource())

if release_threshold is not None:
release_threshold = parse_device_memory_limit(
release_threshold, alignment_size=256
)

if not disable_rmm:
if rmm_async:
mr = rmm.mr.CudaAsyncMemoryResource(
initial_pool_size=pool_size,
release_threshold=release_threshold,
)

if maximum_pool_size is not None:
mr = rmm.mr.LimitingResourceAdaptor(
mr, allocation_limit=maximum_pool_size
)

madsbk marked this conversation as resolved.
Show resolved Hide resolved
rmm.mr.set_current_device_resource(mr)

setup_rmm_resources(
statistics=statistics, rmm_track_allocations=rmm_track_allocations
)
else:
rmm.reinitialize(
pool_allocator=not disable_rmm_pool,
managed_memory=rmm_managed,
initial_pool_size=pool_size,
maximum_pool_size=maximum_pool_size,
logging=logging,
log_file_name=get_rmm_log_file_name(
dask_worker, logging, log_directory
),
)

setup_rmm_resources(
statistics=statistics, rmm_track_allocations=rmm_track_allocations
)


def setup_memory_pools(
client,
is_gpu,
disable_rmm,
disable_rmm_pool,
pool_size,
disable_pool,
maximum_pool_size,
rmm_async,
rmm_managed,
release_threshold,
Expand All @@ -468,8 +535,10 @@ def setup_memory_pools(
return
client.run(
setup_memory_pool,
disable_rmm=disable_rmm,
disable_rmm_pool=disable_rmm_pool,
pool_size=pool_size,
disable_pool=disable_pool,
maximum_pool_size=maximum_pool_size,
rmm_async=rmm_async,
rmm_managed=rmm_managed,
release_threshold=release_threshold,
Expand All @@ -482,7 +551,9 @@ def setup_memory_pools(
client.run_on_scheduler(
setup_memory_pool,
pool_size=1e9,
disable_pool=disable_pool,
disable_rmm=disable_rmm,
disable_rmm_pool=disable_rmm_pool,
maximum_pool_size=maximum_pool_size,
rmm_async=rmm_async,
rmm_managed=rmm_managed,
release_threshold=release_threshold,
Expand Down
4 changes: 4 additions & 0 deletions dask_cuda/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def cuda():
memory on the GPU is used. ``rmm_pool_size`` must be specified to use RMM pool and
to set the maximum pool size.

.. note::
When paired with `--enable-rmm-async` the maximum size cannot be guaranteed due
to fragmentation.

.. note::
This size is a per-worker configuration, and not cluster-wide.""",
)
Expand Down
4 changes: 4 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class LocalCUDACluster(LocalCluster):
memory on the GPU is used. ``rmm_pool_size`` must be specified to use RMM pool
and to set the maximum pool size.

.. note::
When paired with `--enable-rmm-async` the maximum size cannot be guaranteed
due to fragmentation.

.. note::
This size is a per-worker configuration, and not cluster-wide.
rmm_managed_memory : bool, default False
Expand Down
Loading