diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index de2a2d0a7..3045f0fff 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -30,6 +30,7 @@ get_n_gpus, get_ucx_config, get_ucx_net_devices, + nvml_device_index, parse_device_memory_limit, ) @@ -219,7 +220,9 @@ def del_pid_file(): security=security, env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)}, plugins={ - CPUAffinity(get_cpu_affinity(i)), + CPUAffinity( + get_cpu_affinity(nvml_device_index(i, cuda_visible_devices(i))) + ), RMMSetup( rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, ), @@ -236,7 +239,7 @@ def del_pid_file(): cuda_device_index=i, ) }, - data=data(i), + data=data(nvml_device_index(i, cuda_visible_devices(i))), worker_class=worker_class, **kwargs, ) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 0276a4b6b..8d08381f1 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -17,6 +17,7 @@ get_cpu_affinity, get_ucx_config, get_ucx_net_devices, + nvml_device_index, parse_cuda_visible_device, parse_device_memory_limit, ) @@ -215,7 +216,7 @@ def __init__( memory_limit, threads_per_worker, n_workers ) self.device_memory_limit = parse_device_memory_limit( - device_memory_limit, device_index=0 + device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES) ) self.rmm_pool_size = rmm_pool_size @@ -361,7 +362,9 @@ def new_worker_spec(self): { "env": {"CUDA_VISIBLE_DEVICES": visible_devices,}, "plugins": { - CPUAffinity(get_cpu_affinity(worker_count)), + CPUAffinity( + get_cpu_affinity(nvml_device_index(0, visible_devices)) + ), RMMSetup( self.rmm_pool_size, self.rmm_managed_memory, diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index b56e1b6c1..5bb170197 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -12,6 +12,7 @@ get_preload_options, get_ucx_config, get_ucx_net_devices, + nvml_device_index, parse_cuda_visible_device, parse_device_memory_limit, unpack_bitmask, @@ -59,6 +60,18 @@ def test_cpu_affinity(): assert os.sched_getaffinity(0) == set(affinity) +def test_cpu_affinity_and_cuda_visible_devices(): + affinity = dict() + for i in range(get_n_gpus()): + # The negative here would be `device = 0` as required for CUDA runtime + # calls. + device = nvml_device_index(0, cuda_visible_devices(i)) + affinity[device] = get_cpu_affinity(device) + + for i in range(get_n_gpus()): + assert get_cpu_affinity(i) == affinity[i] + + def test_get_device_total_memory(): for i in range(get_n_gpus()): with cuda.gpus[i]: diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 9c6c4dcff..d1c581acc 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -493,6 +493,37 @@ def cuda_visible_devices(i, visible=None): return ",".join(map(str, L)) +def nvml_device_index(i, CUDA_VISIBLE_DEVICES): + """Get the device index for NVML addressing + + NVML expects the index of the physical device, unlike CUDA runtime which + expects the address relative to `CUDA_VISIBLE_DEVICES`. This function + returns the i-th device index from the `CUDA_VISIBLE_DEVICES` + comma-separated string of devices or list. + + Examples + -------- + >>> nvml_device_index(1, "0,1,2,3") + 1 + >>> nvml_device_index(1, "1,2,3,0") + 2 + >>> nvml_device_index(1, [0,1,2,3]) + 1 + >>> nvml_device_index(1, [1,2,3,0]) + 2 + >>> nvml_device_index(1, 2) + Traceback (most recent call last): + ... + ValueError: CUDA_VISIBLE_DEVICES must be `str` or `list` + """ + if isinstance(CUDA_VISIBLE_DEVICES, str): + return int(CUDA_VISIBLE_DEVICES.split(",")[i]) + elif isinstance(CUDA_VISIBLE_DEVICES, list): + return CUDA_VISIBLE_DEVICES[i] + else: + raise ValueError("`CUDA_VISIBLE_DEVICES` must be `str` or `list`") + + def parse_device_memory_limit(device_memory_limit, device_index=0): """Parse memory limit to be used by a CUDA device.