Skip to content

Commit

Permalink
Support for LocalCUDACluster with MIG (#674)
Browse files Browse the repository at this point in the history
Adds support to start LocalCUDACluster and cuda workers on MIG instances by passing in uuids of the mig instances. Builds off of existing PR #671
More specifically this PR does the following:
1. Allows starting `LocalCUDACluster` as the following: `cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES=["MIG-uuid1","MIG-uuid2",...])` or by passing them as `,` separated strings. 

Needs Discussion:
0. Apart from manually testing on a MIG instance on the cloud, how would we test this?
1. What if the user does not pass in any argument to `LocalCUDACluster` while using MIG instances? By default `LocalCUDACluster` will try to use all the parent GPUs and run into error.
2. What if we have a deployment with MIG-enabled and non-MIG-enabled GPUs?
3. `dask.distributed` diagnostics will also fail if we run on MIG enabled GPUs since it uses `pynvml` APIS for non-MIG-enabled GPUs only at the moment.

Authors:
  - Anirban Das (https://github.com/akaanirban)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #674
  • Loading branch information
akaanirban authored Aug 2, 2021
1 parent bc47872 commit b3bda5d
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 18 deletions.
54 changes: 52 additions & 2 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

import pytest

from distributed import Client
from distributed import Client, wait
from distributed.system import MEMORY_LIMIT
from distributed.utils_test import loop # noqa: F401
from distributed.utils_test import popen

import rmm

from dask_cuda.utils import get_n_gpus, wait_workers
from dask_cuda.utils import get_gpu_count_mig, get_n_gpus, wait_workers

_driver_version = rmm._cuda.gpu.driverGetVersion()
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()
Expand Down Expand Up @@ -186,3 +186,53 @@ def test_unknown_argument():
ret = subprocess.run(["dask-cuda-worker", "--my-argument"], capture_output=True)
assert ret.returncode != 0
assert b"Scheduler address: --my-argument" in ret.stderr


def test_cuda_mig_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811
init_nvmlstatus = os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__NVML")
try:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = "False"
uuids = get_gpu_count_mig(return_uuids=True)[1]
# test only with some MIG Instances assuming the test bed
# does not have a huge number of mig instances
if len(uuids) > 0:
uuids = [i.decode("utf-8") for i in uuids]
else:
pytest.skip("No MIG devices found")
CUDA_VISIBLE_DEVICES = ",".join(uuids)
os.environ["CUDA_VISIBLE_DEVICES"] = CUDA_VISIBLE_DEVICES
nthreads = len(CUDA_VISIBLE_DEVICES)
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9359",
"--host",
"127.0.0.1",
"--nthreads",
str(nthreads),
"--no-dashboard",
"--worker-class",
"dask_cuda.utils.MockWorker",
]
):
with Client("127.0.0.1:9359", loop=loop) as client:
assert wait_workers(client, n_gpus=len(uuids))
# Check to see if all workers are up and
# CUDA_VISIBLE_DEVICES cycles properly

def get_visible_devices():
return os.environ["CUDA_VISIBLE_DEVICES"]

result = client.run(get_visible_devices)
wait(result)
assert all(len(v.split(",")) == len(uuids) for v in result.values())
for i in range(len(uuids)):
assert set(v.split(",")[i] for v in result.values()) == set(
uuids
)
finally:
if "CUDA_VISIBLE_DEVICES" in os.environ:
del os.environ["CUDA_VISIBLE_DEVICES"]
if init_nvmlstatus:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = init_nvmlstatus
39 changes: 38 additions & 1 deletion dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dask_cuda import CUDAWorker, LocalCUDACluster, utils
from dask_cuda.initialize import initialize
from dask_cuda.utils import MockWorker
from dask_cuda.utils import MockWorker, get_gpu_count_mig

_driver_version = rmm._cuda.gpu.driverGetVersion()
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()
Expand Down Expand Up @@ -206,3 +206,40 @@ async def test_cluster_worker():
await new_worker
await client.wait_for_workers(2)
await new_worker.close()


@gen_test(timeout=20)
async def test_available_mig_workers():
import dask

init_nvmlstatus = os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__NVML")
try:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = "False"
dask.config.refresh()
uuids = get_gpu_count_mig(return_uuids=True)[1]
if len(uuids) > 0:
uuids = [i.decode("utf-8") for i in uuids]
else:
pytest.skip("No MIG devices found")
CUDA_VISIBLE_DEVICES = ",".join(uuids)
os.environ["CUDA_VISIBLE_DEVICES"] = CUDA_VISIBLE_DEVICES
async with LocalCUDACluster(
CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES, asynchronous=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
len(cluster.workers) == len(uuids)

# Check to see if CUDA_VISIBLE_DEVICES cycles properly
def get_visible_devices():
return os.environ["CUDA_VISIBLE_DEVICES"]

result = await client.run(get_visible_devices)

assert all(len(v.split(",")) == len(uuids) for v in result.values())
for i in range(len(uuids)):
assert set(v.split(",")[i] for v in result.values()) == set(uuids)
finally:
if "CUDA_VISIBLE_DEVICES" in os.environ:
del os.environ["CUDA_VISIBLE_DEVICES"]
if init_nvmlstatus:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = init_nvmlstatus
29 changes: 29 additions & 0 deletions dask_cuda/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,32 @@ def test_parse_device_memory_limit():
assert parse_device_memory_limit(0.8) == int(total * 0.8)
assert parse_device_memory_limit(1000000000) == 1000000000
assert parse_device_memory_limit("1GB") == 1000000000


def test_parse_visible_mig_devices():
pynvml = pytest.importorskip("pynvml")
pynvml.nvmlInit()
for index in range(get_gpu_count()):
handle = pynvml.nvmlDeviceGetHandleByIndex(index)
try:
mode = pynvml.nvmlDeviceGetMigMode(handle)[0]
except pynvml.NVMLError:
# if not a MIG device, i.e. a normal GPU, skip
continue
if mode:
# Just checks to see if there are any MIG enabled GPUS.
# If there is one, check if the number of mig instances
# in that GPU is <= to count, where count gives us the
# maximum number of MIG devices/instances that can exist
# under a given parent NVML device.
count = pynvml.nvmlDeviceGetMaxMigDeviceCount(handle)
miguuids = []
for i in range(count):
try:
mighandle = pynvml.nvmlDeviceGetMigDeviceHandleByIndex(
device=handle, index=i
)
miguuids.append(mighandle)
except pynvml.NVMLError:
pass
assert len(miguuids) <= count
93 changes: 79 additions & 14 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,50 @@ def get_gpu_count():
return pynvml.nvmlDeviceGetCount()


def get_cpu_affinity(device_index):
@toolz.memoize
def get_gpu_count_mig(return_uuids=False):
"""Return the number of MIG instances available
Parameters
----------
return_uuids: bool
Returns the uuids of the MIG instances available optionally
"""
pynvml.nvmlInit()
uuids = []
for index in range(get_gpu_count()):
handle = pynvml.nvmlDeviceGetHandleByIndex(index)
try:
is_mig_mode = pynvml.nvmlDeviceGetMigMode(handle)[0]
except pynvml.NVMLError:
# if not a MIG device, i.e. a normal GPU, skip
continue
if is_mig_mode:
count = pynvml.nvmlDeviceGetMaxMigDeviceCount(handle)
miguuids = []
for i in range(count):
try:
mighandle = pynvml.nvmlDeviceGetMigDeviceHandleByIndex(
device=handle, index=i
)
miguuids.append(mighandle)
uuids.append(pynvml.nvmlDeviceGetUUID(mighandle))
except pynvml.NVMLError:
pass
if return_uuids:
return len(uuids), uuids
return len(uuids)


def get_cpu_affinity(device_index=None):
"""Get a list containing the CPU indices to which a GPU is directly connected.
Use either the device index or the specified device identifier UUID.
Parameters
----------
device_index: int
Index of the GPU device
device_index: int or str
Index or UUID of the GPU device
Examples
--------
Expand All @@ -158,10 +195,19 @@ def get_cpu_affinity(device_index):
pynvml.nvmlInit()

try:
if device_index and not str(device_index).isnumeric():
# This means device_index is UUID.
# This works for both MIG and non-MIG device UUIDs.
handle = pynvml.nvmlDeviceGetHandleByUUID(str.encode(device_index))
if pynvml.nvmlDeviceIsMigDeviceHandle(handle):
# Additionally get parent device handle
# if the device itself is a MIG instance
handle = pynvml.nvmlDeviceGetDeviceHandleFromMigDeviceHandle(handle)
else:
handle = pynvml.nvmlDeviceGetHandleByIndex(device_index)
# Result is a list of 64-bit integers, thus ceil(get_cpu_count() / 64)
affinity = pynvml.nvmlDeviceGetCpuAffinity(
pynvml.nvmlDeviceGetHandleByIndex(device_index),
math.ceil(get_cpu_count() / 64),
handle, math.ceil(get_cpu_count() / 64),
)
return unpack_bitmask(affinity)
except pynvml.NVMLError:
Expand All @@ -181,12 +227,17 @@ def get_n_gpus():

def get_device_total_memory(index=0):
"""
Return total memory of CUDA device with index
Return total memory of CUDA device with index or with device identifier UUID
"""
pynvml.nvmlInit()
return pynvml.nvmlDeviceGetMemoryInfo(
pynvml.nvmlDeviceGetHandleByIndex(index)
).total

if index and not str(index).isnumeric():
# This means index is UUID. This works for both MIG and non-MIG device UUIDs.
handle = pynvml.nvmlDeviceGetHandleByUUID(str.encode(str(index)))
else:
# This is a device index
handle = pynvml.nvmlDeviceGetHandleByIndex(index)
return pynvml.nvmlDeviceGetMemoryInfo(handle).total


def get_ucx_net_devices(
Expand Down Expand Up @@ -464,12 +515,13 @@ def parse_cuda_visible_device(dev):
try:
return int(dev)
except ValueError:
if any(dev.startswith(prefix) for prefix in ["GPU-", "MIG-GPU-"]):
if any(dev.startswith(prefix) for prefix in ["GPU-", "MIG-GPU-", "MIG-"]):
return dev
else:
raise ValueError(
"Devices in CUDA_VISIBLE_DEVICES must be comma-separated integers "
"or strings beginning with 'GPU-' or 'MIG-GPU-' prefixes."
"or strings beginning with 'GPU-' or 'MIG-GPU-' prefixes"
" or 'MIG-<UUID>'."
)


Expand Down Expand Up @@ -514,13 +566,25 @@ def nvml_device_index(i, CUDA_VISIBLE_DEVICES):
1
>>> nvml_device_index(1, [1,2,3,0])
2
>>> nvml_device_index(1, ["GPU-84fd49f2-48ad-50e8-9f2e-3bf0dfd47ccb",
"GPU-d6ac2d46-159b-5895-a854-cb745962ef0f",
"GPU-158153b7-51d0-5908-a67c-f406bc86be17"])
"MIG-d6ac2d46-159b-5895-a854-cb745962ef0f"
>>> nvml_device_index(2, ["MIG-41b3359c-e721-56e5-8009-12e5797ed514",
"MIG-65b79fff-6d3c-5490-a288-b31ec705f310",
"MIG-c6e2bae8-46d4-5a7e-9a68-c6cf1f680ba0"])
"MIG-c6e2bae8-46d4-5a7e-9a68-c6cf1f680ba0"
>>> nvml_device_index(1, 2)
Traceback (most recent call last):
...
ValueError: CUDA_VISIBLE_DEVICES must be `str` or `list`
"""
if isinstance(CUDA_VISIBLE_DEVICES, str):
return int(CUDA_VISIBLE_DEVICES.split(",")[i])
ith_elem = CUDA_VISIBLE_DEVICES.split(",")[i]
if ith_elem.isnumeric():
return int(ith_elem)
else:
return ith_elem
elif isinstance(CUDA_VISIBLE_DEVICES, list):
return CUDA_VISIBLE_DEVICES[i]
else:
Expand All @@ -537,8 +601,9 @@ def parse_device_memory_limit(device_memory_limit, device_index=0):
This can be a float (fraction of total device memory), an integer (bytes),
a string (like 5GB or 5000M), and "auto", 0 or None for the total device
size.
device_index: int
The index of device from which to obtain the total memory amount.
device_index: int or str
The index or UUID of the device from which to obtain the total memory amount.
Default: 0.
Examples
--------
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dask>=2.22.0
distributed>=2.22.0
pynvml>=8.0.3
pynvml>=11.0.0
numpy>=1.16.0
numba>=0.53.1

0 comments on commit b3bda5d

Please sign in to comment.