Skip to content

Commit

Permalink
Add device memory spill support (LRU-based only) (rapidsai#51)
Browse files Browse the repository at this point in the history
* Add DeviceHostFile class to handle memory-spilling in LRU fashion
* Add CuPy and enable __array_function__ in CI build
* Update version requirements of dask, distributed and numpy
* Add get_device_total_memory utility function
* Add numba as a requirement
* Add --device-memory-limit paramater to dask-cuda-worker
* Add `fast` to DeviceHostFile for Worker compatibility
* Fix some setup.py formatting
* Fix LocalCUDACluster device_memory_limit parsing and local dir creation
  • Loading branch information
pentschev authored and mrocklin committed May 15, 2019
1 parent 23efa90 commit 6c4004b
Show file tree
Hide file tree
Showing 12 changed files with 525 additions and 19 deletions.
11 changes: 11 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ cd $WORKSPACE
export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags`
export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count`

# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x,
# will possibly be enabled by default starting on 1.17)
export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1

################################################################################
# SETUP - Check environment
################################################################################
Expand All @@ -38,6 +42,13 @@ conda list
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False

################################################################################
# SETUP - Install additional packages
################################################################################

# Install CuPy for tests
pip install cupy-cuda100==6.0.0rc1

################################################################################
# TEST - Run tests
################################################################################
Expand Down
6 changes: 4 additions & 2 deletions conda/recipes/dask-cuda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ requirements:
- setuptools
run:
- python x.x
- dask-core >=1.1.4
- distributed >=1.25.2
- dask-core >=1.2.1
- distributed >=1.28.0
- numpy >=1.16.0
- numba >=0.40.1

test:
imports:
Expand Down
52 changes: 47 additions & 5 deletions dask_cuda/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import click
from distributed import Nanny, Worker
from distributed.config import config
from distributed.utils import get_ip_interface, parse_timedelta
from distributed.worker import _ncores
from distributed.diskutils import WorkSpace
from distributed.utils import (
get_ip_interface,
parse_timedelta,
parse_bytes,
warn_on_duration,
)
from distributed.worker import _ncores, parse_memory_limit
from distributed.security import Security
from distributed.cli.utils import (
check_python_3,
Expand All @@ -23,8 +29,9 @@
enable_proctitle_on_current,
)

from .device_host_file import DeviceHostFile
from .local_cuda_cluster import cuda_visible_devices
from .utils import get_n_gpus
from .utils import get_n_gpus, get_device_total_memory

from toolz import valmap
from tornado.ioloop import IOLoop, TimeoutError
Expand Down Expand Up @@ -98,6 +105,16 @@
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management",
)
@click.option(
"--device-memory-limit",
default="auto",
help="Bytes of memory per CUDA device that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total device memory), "
"string (like 5GB or 5000M), "
"'auto', or zero for no memory management "
"(i.e., allow full device memory usage).",
)
@click.option(
"--reconnect/--no-reconnect",
default=True,
Expand Down Expand Up @@ -146,6 +163,7 @@ def main(
nthreads,
name,
memory_limit,
device_memory_limit,
pid_file,
reconnect,
resources,
Expand Down Expand Up @@ -175,7 +193,7 @@ def main(
nprocs = get_n_gpus()

if not nthreads:
nthreads = min(1, _ncores // nprocs )
nthreads = min(1, _ncores // nprocs)

if pid_file:
with open(pid_file, "w") as f:
Expand Down Expand Up @@ -234,6 +252,18 @@ def del_pid_file():
if death_timeout is not None:
death_timeout = parse_timedelta(death_timeout, "s")

local_dir = kwargs.get("local_dir", "dask-worker-space")
with warn_on_duration(
"1s",
"Creating scratch directories is taking a surprisingly long time. "
"This is often due to running workers on a network file system. "
"Consider specifying a local-directory to point workers to write "
"scratch data to a local disk.",
):
_workspace = WorkSpace(os.path.abspath(local_dir))
_workdir = _workspace.new_work_dir(prefix="worker-")
local_dir = _workdir.dir_path

nannies = [
t(
scheduler,
Expand All @@ -252,7 +282,19 @@ def del_pid_file():
contact_address=None,
env={"CUDA_VISIBLE_DEVICES": cuda_visible_devices(i)},
name=name if nprocs == 1 or not name else name + "-" + str(i),
**kwargs
data=(
DeviceHostFile,
{
"device_memory_limit": get_device_total_memory(index=i)
if (device_memory_limit == "auto" or device_memory_limit == int(0))
else parse_bytes(device_memory_limit),
"memory_limit": parse_memory_limit(
memory_limit, nthreads, total_cores=nprocs
),
"local_dir": local_dir,
},
),
**kwargs,
)
for i in range(nprocs)
]
Expand Down
107 changes: 107 additions & 0 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from zict import Buffer, File, Func
from zict.common import ZictBase
from distributed.protocol import deserialize_bytes, serialize_bytes
from distributed.worker import weight

from functools import partial
import os


def _is_device_object(obj):
"""
Check if obj is a device object, by checking if it has a
__cuda_array_interface__ attributed
"""
return hasattr(obj, "__cuda_array_interface__")


def _serialize_if_device(obj):
""" Serialize an object if it's a device object """
if _is_device_object(obj):
return serialize_bytes(obj, on_error="raise")
else:
return obj


def _deserialize_if_device(obj):
""" Deserialize an object if it's an instance of bytes """
if isinstance(obj, bytes):
return deserialize_bytes(obj)
else:
return obj


class DeviceHostFile(ZictBase):
""" Manages serialization/deserialization of objects.
Three LRU cache levels are controlled, for device, host and disk.
Each level takes care of serializing objects once its limit has been
reached and pass it to the subsequent level. Similarly, each cache
may deserialize the object, but storing it back in the appropriate
cache, depending on the type of object being deserialized.
Parameters
----------
device_memory_limit: int
Number of bytes of CUDA device memory for device LRU cache,
spills to host cache once filled.
memory_limit: int
Number of bytes of host memory for host LRU cache, spills to
disk once filled.
local_dir: path
Path where to store serialized objects on disk
"""

def __init__(
self, device_memory_limit=None, memory_limit=None, local_dir="dask-worker-space"
):
path = os.path.join(local_dir, "storage")

self.host_func = dict()
self.disk_func = Func(
partial(serialize_bytes, on_error="raise"), deserialize_bytes, File(path)
)
self.host_buffer = Buffer(
self.host_func, self.disk_func, memory_limit, weight=weight
)

self.device_func = dict()
self.device_host_func = Func(
_serialize_if_device, _deserialize_if_device, self.host_buffer
)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)

self.device = self.device_buffer.fast.d
self.host = self.host_buffer.fast.d
self.disk = self.host_buffer.slow.d

# For Worker compatibility only, where `fast` is host memory buffer
self.fast = self.host_buffer.fast

def __setitem__(self, key, value):
if _is_device_object(value):
self.device_buffer[key] = value
else:
self.host_buffer[key] = value

def __getitem__(self, key):
if key in self.host_buffer:
obj = self.host_buffer[key]
del self.host_buffer[key]
self.device_buffer[key] = _deserialize_if_device(obj)

if key in self.device_buffer:
return self.device_buffer[key]
else:
raise KeyError

def __len__(self):
return len(self.device_buffer)

def __iter__(self):
return iter(self.device_buffer)

def __delitem__(self, i):
del self.device_buffer[i]
70 changes: 67 additions & 3 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os
import warnings

from tornado import gen

from dask.distributed import LocalCluster
from distributed.worker import TOTAL_MEMORY
from distributed.diskutils import WorkSpace
from distributed.nanny import Nanny
from distributed.worker import Worker, TOTAL_MEMORY
from distributed.utils import parse_bytes, warn_on_duration

from .utils import get_n_gpus
from .device_host_file import DeviceHostFile
from .utils import get_n_gpus, get_device_total_memory


def cuda_visible_devices(i, visible=None):
Expand Down Expand Up @@ -35,7 +40,8 @@ def __init__(
threads_per_worker=1,
processes=True,
memory_limit=None,
**kwargs
device_memory_limit=None,
**kwargs,
):
if n_workers is None:
n_workers = get_n_gpus()
Expand All @@ -45,6 +51,9 @@ def __init__(
raise ValueError("Can not specify more processes than GPUs")
if memory_limit is None:
memory_limit = TOTAL_MEMORY / n_workers
self.host_memory_limit = memory_limit
self.device_memory_limit = device_memory_limit

LocalCluster.__init__(
self,
n_workers=n_workers,
Expand Down Expand Up @@ -82,3 +91,58 @@ def _start(self, ip=None, n_workers=0):
self.status = "running"

raise gen.Return(self)

@gen.coroutine
def _start_worker(self, death_timeout=60, **kwargs):
if self.status and self.status.startswith("clos"):
warnings.warn("Tried to start a worker while status=='%s'" % self.status)
return

if self.processes:
W = Nanny
kwargs["quiet"] = True
else:
W = Worker

local_dir = kwargs.get("local_dir", "dask-worker-space")
with warn_on_duration(
"1s",
"Creating scratch directories is taking a surprisingly long time. "
"This is often due to running workers on a network file system. "
"Consider specifying a local-directory to point workers to write "
"scratch data to a local disk.",
):
_workspace = WorkSpace(os.path.abspath(local_dir))
_workdir = _workspace.new_work_dir(prefix="worker-")
local_dir = _workdir.dir_path

device_index = int(kwargs["env"]["CUDA_VISIBLE_DEVICES"].split(",")[0])
if self.device_memory_limit is None:
self.device_memory_limit = get_device_total_memory(device_index)
elif isinstance(self.device_memory_limit, str):
self.device_memory_limit = parse_bytes(self.device_memory_limit)
data = DeviceHostFile(
device_memory_limit=self.device_memory_limit,
memory_limit=self.host_memory_limit,
local_dir=local_dir,
)

w = yield W(
self.scheduler.address,
loop=self.loop,
death_timeout=death_timeout,
silence_logs=self.silence_logs,
data=data,
**kwargs,
)

self.workers.append(w)

while w.status != "closed" and w.worker_address not in self.scheduler.workers:
yield gen.sleep(0.01)

if w.status == "closed" and self.scheduler.status == "running":
self.workers.remove(w)
raise gen.TimeoutError("Worker failed to start")

raise gen.Return(w)
2 changes: 2 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def test_cuda_visible_devices(loop):
"127.0.0.1:9359",
"--host",
"127.0.0.1",
"--device-memory-limit",
"1 MB",
"--no-bokeh",
]
) as worker:
Expand Down
Loading

0 comments on commit 6c4004b

Please sign in to comment.