From c7633fdec8769c755e0a253fe6bcb5a2c6655645 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 12 Feb 2021 15:50:31 +0100 Subject: [PATCH 01/12] fix type hints --- dask_cuda/proxify_device_objects.py | 6 +++--- dask_cuda/proxify_host_file.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_cuda/proxify_device_objects.py b/dask_cuda/proxify_device_objects.py index 7c166003f..fdd2130ae 100644 --- a/dask_cuda/proxify_device_objects.py +++ b/dask_cuda/proxify_device_objects.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, List, MutableMapping from dask.utils import Dispatch @@ -9,7 +9,7 @@ def proxify_device_objects( obj: Any, - proxied_id_to_proxy: Dict[int, ProxyObject], + proxied_id_to_proxy: MutableMapping[int, ProxyObject], found_proxies: List[ProxyObject], ): """ Wrap device objects in ProxyObject @@ -22,7 +22,7 @@ def proxify_device_objects( ---------- obj: Any Object to search through or wrap in a ProxyObject. - proxied_id_to_proxy: Dict[int, ProxyObject] + proxied_id_to_proxy: MutableMapping[int, ProxyObject] Dict mapping the id() of proxied objects (CUDA device objects) to their proxy and is updated with all new proxied objects found in `obj`. found_proxies: List[ProxyObject] diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index a47669c37..48100f2f6 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -164,7 +164,7 @@ def get_dev_buffer_to_proxies(self) -> DefaultDict[Hashable, List[ProxyObject]]: with self.lock: # Notice, multiple proxy object can point to different non-overlapping # parts of the same device buffer. - ret = DefaultDict(list) + ret = defaultdict(list) for proxy in self.proxies_tally.get_unspilled_proxies(): for dev_buffer in proxy._obj_pxy_get_device_memory_objects(): ret[dev_buffer].append(proxy) From a9cf4cf531e35b114cda151d5f865ac803f5f025 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 16 Feb 2021 11:25:53 +0100 Subject: [PATCH 02/12] tests: minor clean up --- dask_cuda/tests/test_proxify_host_file.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index a03c784b0..777b1c532 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -22,13 +22,13 @@ def test_one_item_limit(): # Check k1 is spilled because of the newer k2 k1 = dhf["k1"] + k2 = dhf["k2"] assert k1._obj_pxy_is_serialized() - assert not dhf["k2"]._obj_pxy_is_serialized() + assert not k2._obj_pxy_is_serialized() # Accessing k1 spills k2 and unspill k1 k1_val = k1[0] assert k1_val == 1 - k2 = dhf["k2"] assert k2._obj_pxy_is_serialized() # Duplicate arrays changes nothing @@ -50,11 +50,11 @@ def test_one_item_limit(): # Deleting k2 does not change anything since k3 still holds a # reference to the underlying proxy object - assert dhf.proxies_tally.get_dev_mem_usage() == 8 + assert dhf.proxies_tally.get_dev_mem_usage() == itemsize p1 = list(dhf.proxies_tally.get_unspilled_proxies()) assert len(p1) == 1 del dhf["k2"] - assert dhf.proxies_tally.get_dev_mem_usage() == 8 + assert dhf.proxies_tally.get_dev_mem_usage() == itemsize p2 = list(dhf.proxies_tally.get_unspilled_proxies()) assert len(p2) == 1 assert p1[0] is p2[0] From 72250469af2a2099dd3193263027d3940da57861 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 16 Feb 2021 16:04:46 +0100 Subject: [PATCH 03/12] Start of implementing externals --- dask_cuda/proxify_host_file.py | 21 ++++++++++++++++++++- dask_cuda/proxy_object.py | 6 ++++-- dask_cuda/tests/test_proxify_host_file.py | 22 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 48100f2f6..70cd0cb4e 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -181,13 +181,32 @@ def get_access_info(self) -> Tuple[int, List[Tuple[int, int, List[ProxyObject]]] total_dev_mem_usage += size return total_dev_mem_usage, dev_buf_access + def add_external(self, obj): + found_proxies: List[ProxyObject] = [] + proxied_id_to_proxy = self.proxies_tally.get_proxied_id_to_proxy() + ret = proxify_device_objects(obj, proxied_id_to_proxy, found_proxies) + last_access = time.time() + self_weakref = weakref.ref(self) + for p in found_proxies: + weakref.finalize(p, self.del_external, id(p)) + external = weakref.proxy(p) + p._obj_pxy["hostfile"] = self_weakref + p._obj_pxy["last_access"] = last_access + p._obj_pxy["external"] = external + self.proxies_tally.add_key(id(p), [external]) + self.maybe_evict() + return ret + + def del_external(self, name): + self.proxies_tally.del_key(name) + def __setitem__(self, key, value): with self.lock: if key in self.store: # Make sure we register the removal of an existing key del self[key] - found_proxies = [] + found_proxies: List[ProxyObject] = [] proxied_id_to_proxy = self.proxies_tally.get_proxied_id_to_proxy() self.store[key] = proxify_device_objects( value, proxied_id_to_proxy, found_proxies diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 26aa8ce77..113489783 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -277,7 +277,8 @@ def _obj_pxy_serialize(self, serializers): self._obj_pxy["serializers"] = serializers hostfile = self._obj_pxy.get("hostfile", lambda: None)() if hostfile is not None: - hostfile.proxies_tally.spill_proxy(self) + external = self._obj_pxy.get("external", self) + hostfile.proxies_tally.spill_proxy(external) # Invalidate the (possible) cached "device_memory_objects" self._obj_pxy_cache.pop("device_memory_objects", None) @@ -311,7 +312,8 @@ def _obj_pxy_deserialize(self): self._obj_pxy["obj"] = distributed.protocol.deserialize(header, frames) self._obj_pxy["serializers"] = None if hostfile is not None: - hostfile.proxies_tally.unspill_proxy(self) + external = self._obj_pxy.get("external", self) + hostfile.proxies_tally.unspill_proxy(external) self._obj_pxy["last_access"] = time.time() return self._obj_pxy["obj"] diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 777b1c532..f1c0869c8 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -129,3 +129,25 @@ def test_cudf_get_device_memory_objects(): ] res = get_device_memory_objects(objects) assert len(res) == 4, "We expect four buffer objects" + + +def test_externals(): + dhf = ProxifyHostFile(device_memory_limit=itemsize) + dhf["k1"] = cupy.arange(1) + 1 + k1 = dhf["k1"] + k2 = dhf.add_external(cupy.arange(1) + 1) + # `k2` isn't part of the store but still triggers spilling of `k1` + assert len(dhf) == 1 + assert k1._obj_pxy_is_serialized() + assert not k2._obj_pxy_is_serialized() + k1[0] # Trigger spilling of `k2` + assert not k1._obj_pxy_is_serialized() + assert k2._obj_pxy_is_serialized() + k2[0] # Trigger spilling of `k1` + assert k1._obj_pxy_is_serialized() + assert not k2._obj_pxy_is_serialized() + assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + # Removing `k2` also removes it from the tally + del k2 + assert dhf.proxies_tally.get_dev_mem_usage() == 0 + assert len(list(dhf.proxies_tally.get_unspilled_proxies())) == 0 From c02096feead61cc0feb7bb01bbec92db740f3d29 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 17 Feb 2021 13:32:40 +0100 Subject: [PATCH 04/12] Support setitem of external object --- dask_cuda/proxify_device_objects.py | 1 - dask_cuda/proxify_host_file.py | 13 +++++++++++-- dask_cuda/tests/test_proxify_host_file.py | 12 ++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/dask_cuda/proxify_device_objects.py b/dask_cuda/proxify_device_objects.py index fdd2130ae..e59bc1c04 100644 --- a/dask_cuda/proxify_device_objects.py +++ b/dask_cuda/proxify_device_objects.py @@ -56,7 +56,6 @@ def proxify_device_object_default(obj, proxied_id_to_proxy, found_proxies): @dispatch.register(ProxyObject) def proxify_device_object_proxy_object(obj, proxied_id_to_proxy, found_proxies): - # We deserialize CUDA-serialized objects since it is very cheap and # makes it easy to administrate device memory usage if obj._obj_pxy_is_serialized() and "cuda" in obj._obj_pxy["serializers"]: diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 70cd0cb4e..5ef325cf4 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -188,12 +188,14 @@ def add_external(self, obj): last_access = time.time() self_weakref = weakref.ref(self) for p in found_proxies: - weakref.finalize(p, self.del_external, id(p)) + name = id(p) + finalize = weakref.finalize(p, self.del_external, name) external = weakref.proxy(p) p._obj_pxy["hostfile"] = self_weakref p._obj_pxy["last_access"] = last_access p._obj_pxy["external"] = external - self.proxies_tally.add_key(id(p), [external]) + p._obj_pxy["external_finalize"] = finalize + self.proxies_tally.add_key(name, [external]) self.maybe_evict() return ret @@ -216,6 +218,13 @@ def __setitem__(self, key, value): for p in found_proxies: p._obj_pxy["hostfile"] = self_weakref p._obj_pxy["last_access"] = last_access + + external_finalize = p._obj_pxy.get("external_finalize", None) + if external_finalize is not None: + external_finalize() + del p._obj_pxy["external"] + del p._obj_pxy["external_finalize"] + self.proxies_tally.add_key(key, found_proxies) self.maybe_evict() diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index f1c0869c8..10d26a6cd 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -151,3 +151,15 @@ def test_externals(): del k2 assert dhf.proxies_tally.get_dev_mem_usage() == 0 assert len(list(dhf.proxies_tally.get_unspilled_proxies())) == 0 + + +def test_externals_setitem(): + dhf = ProxifyHostFile(device_memory_limit=itemsize) + k1 = dhf.add_external(cupy.arange(1) + 1) + assert len(dhf) == 0 + assert "external" in k1._obj_pxy + assert "external_finalize" in k1._obj_pxy + dhf["k1"] = k1 + assert len(dhf) == 1 + assert "external" not in dhf["k1"]._obj_pxy + assert "external_finalize" not in dhf["k1"]._obj_pxy From 13d5a8a773e1998d8e6a7f0b3d2d4a325753c639 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Thu, 18 Feb 2021 13:20:25 +0100 Subject: [PATCH 05/12] handle weakref already in hostfile when setting items --- dask_cuda/proxify_device_objects.py | 12 ++++++++++++ dask_cuda/proxify_host_file.py | 8 ++------ dask_cuda/tests/test_proxify_host_file.py | 16 ++++++++++++++-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/dask_cuda/proxify_device_objects.py b/dask_cuda/proxify_device_objects.py index e59bc1c04..76a39e70e 100644 --- a/dask_cuda/proxify_device_objects.py +++ b/dask_cuda/proxify_device_objects.py @@ -41,6 +41,10 @@ def proxify(obj, proxied_id_to_proxy, found_proxies, subclass=None): _id = id(obj) if _id in proxied_id_to_proxy: ret = proxied_id_to_proxy[_id] + finalize = ret._obj_pxy.get("external_finalize", None) + if finalize: + finalize() + proxied_id_to_proxy[_id] = ret = asproxy(obj, subclass=subclass) else: proxied_id_to_proxy[_id] = ret = asproxy(obj, subclass=subclass) found_proxies.append(ret) @@ -69,6 +73,14 @@ def proxify_device_object_proxy_object(obj, proxied_id_to_proxy, found_proxies): else: proxied_id_to_proxy[_id] = obj + finalize = obj._obj_pxy.get("external_finalize", None) + if finalize: + finalize() + obj = obj._obj_pxy_copy() + if not obj._obj_pxy_is_serialized(): + _id = id(obj._obj_pxy["obj"]) + proxied_id_to_proxy[_id] = obj + found_proxies.append(obj) return obj diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 5ef325cf4..49011f012 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -182,6 +182,7 @@ def get_access_info(self) -> Tuple[int, List[Tuple[int, int, List[ProxyObject]]] return total_dev_mem_usage, dev_buf_access def add_external(self, obj): + # Notice, since `self.store` isn't modified no lock is needed found_proxies: List[ProxyObject] = [] proxied_id_to_proxy = self.proxies_tally.get_proxied_id_to_proxy() ret = proxify_device_objects(obj, proxied_id_to_proxy, found_proxies) @@ -218,12 +219,7 @@ def __setitem__(self, key, value): for p in found_proxies: p._obj_pxy["hostfile"] = self_weakref p._obj_pxy["last_access"] = last_access - - external_finalize = p._obj_pxy.get("external_finalize", None) - if external_finalize is not None: - external_finalize() - del p._obj_pxy["external"] - del p._obj_pxy["external_finalize"] + assert "external" not in p._obj_pxy self.proxies_tally.add_key(key, found_proxies) self.maybe_evict() diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 10d26a6cd..2e37a7c1a 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -156,10 +156,22 @@ def test_externals(): def test_externals_setitem(): dhf = ProxifyHostFile(device_memory_limit=itemsize) k1 = dhf.add_external(cupy.arange(1) + 1) + assert type(k1) is dask_cuda.proxy_object.ProxyObject assert len(dhf) == 0 assert "external" in k1._obj_pxy assert "external_finalize" in k1._obj_pxy dhf["k1"] = k1 + k1 = dhf["k1"] + assert type(k1) is dask_cuda.proxy_object.ProxyObject + assert len(dhf) == 1 + assert "external" not in k1._obj_pxy + assert "external_finalize" not in k1._obj_pxy + + k1 = dhf.add_external(cupy.arange(1) + 1) + k1._obj_pxy_serialize(serializers=("dask", "pickle")) + dhf["k1"] = k1 + k1 = dhf["k1"] + assert type(k1) is dask_cuda.proxy_object.ProxyObject assert len(dhf) == 1 - assert "external" not in dhf["k1"]._obj_pxy - assert "external_finalize" not in dhf["k1"]._obj_pxy + assert "external" not in k1._obj_pxy + assert "external_finalize" not in k1._obj_pxy From 179a39b6dd02098ccb19ec0b1fe294f8c3516d7a Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 19 Feb 2021 15:31:30 +0100 Subject: [PATCH 06/12] Added even more tests of externals --- dask_cuda/tests/test_proxify_host_file.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 2e37a7c1a..7bff3dca3 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -155,7 +155,7 @@ def test_externals(): def test_externals_setitem(): dhf = ProxifyHostFile(device_memory_limit=itemsize) - k1 = dhf.add_external(cupy.arange(1) + 1) + k1 = dhf.add_external(cupy.arange(1)) assert type(k1) is dask_cuda.proxy_object.ProxyObject assert len(dhf) == 0 assert "external" in k1._obj_pxy @@ -167,7 +167,7 @@ def test_externals_setitem(): assert "external" not in k1._obj_pxy assert "external_finalize" not in k1._obj_pxy - k1 = dhf.add_external(cupy.arange(1) + 1) + k1 = dhf.add_external(cupy.arange(1)) k1._obj_pxy_serialize(serializers=("dask", "pickle")) dhf["k1"] = k1 k1 = dhf["k1"] @@ -175,3 +175,13 @@ def test_externals_setitem(): assert len(dhf) == 1 assert "external" not in k1._obj_pxy assert "external_finalize" not in k1._obj_pxy + + dhf["k1"] = cupy.arange(1) + assert len(dhf.proxies_tally.proxy_id_to_proxy) == 1 + assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + k1 = dhf.add_external(k1) + assert len(dhf.proxies_tally.proxy_id_to_proxy) == 1 + assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + k1 = dhf.add_external(dhf["k1"]) + assert len(dhf.proxies_tally.proxy_id_to_proxy) == 1 + assert dhf.proxies_tally.get_dev_mem_usage() == itemsize From 45b3d97eaaaedef4f4768a61082ae1c23d7442de Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 19 Feb 2021 15:33:27 +0100 Subject: [PATCH 07/12] add_external: avoid found objects that are already proxies --- dask_cuda/proxify_device_objects.py | 37 ++++++++++++++++++++++------- dask_cuda/proxify_host_file.py | 9 ++++--- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/dask_cuda/proxify_device_objects.py b/dask_cuda/proxify_device_objects.py index 76a39e70e..9baf5be8f 100644 --- a/dask_cuda/proxify_device_objects.py +++ b/dask_cuda/proxify_device_objects.py @@ -11,6 +11,7 @@ def proxify_device_objects( obj: Any, proxied_id_to_proxy: MutableMapping[int, ProxyObject], found_proxies: List[ProxyObject], + excl_proxies: bool = False, ): """ Wrap device objects in ProxyObject @@ -28,13 +29,15 @@ def proxify_device_objects( found_proxies: List[ProxyObject] List of found proxies in `obj`. Notice, this includes all proxies found, including those already in `proxied_id_to_proxy`. + excl_proxies: bool + Don't add found objects that are already ProxyObject to found_proxies. Returns ------- ret: Any A copy of `obj` where all CUDA device objects are wrapped in ProxyObject """ - return dispatch(obj, proxied_id_to_proxy, found_proxies) + return dispatch(obj, proxied_id_to_proxy, found_proxies, excl_proxies) def proxify(obj, proxied_id_to_proxy, found_proxies, subclass=None): @@ -52,14 +55,18 @@ def proxify(obj, proxied_id_to_proxy, found_proxies, subclass=None): @dispatch.register(object) -def proxify_device_object_default(obj, proxied_id_to_proxy, found_proxies): +def proxify_device_object_default( + obj, proxied_id_to_proxy, found_proxies, excl_proxies +): if hasattr(obj, "__cuda_array_interface__"): return proxify(obj, proxied_id_to_proxy, found_proxies) return obj @dispatch.register(ProxyObject) -def proxify_device_object_proxy_object(obj, proxied_id_to_proxy, found_proxies): +def proxify_device_object_proxy_object( + obj, proxied_id_to_proxy, found_proxies, excl_proxies +): # We deserialize CUDA-serialized objects since it is very cheap and # makes it easy to administrate device memory usage if obj._obj_pxy_is_serialized() and "cuda" in obj._obj_pxy["serializers"]: @@ -81,7 +88,8 @@ def proxify_device_object_proxy_object(obj, proxied_id_to_proxy, found_proxies): _id = id(obj._obj_pxy["obj"]) proxied_id_to_proxy[_id] = obj - found_proxies.append(obj) + if not excl_proxies: + found_proxies.append(obj) return obj @@ -89,13 +97,22 @@ def proxify_device_object_proxy_object(obj, proxied_id_to_proxy, found_proxies): @dispatch.register(tuple) @dispatch.register(set) @dispatch.register(frozenset) -def proxify_device_object_python_collection(seq, proxied_id_to_proxy, found_proxies): - return type(seq)(dispatch(o, proxied_id_to_proxy, found_proxies) for o in seq) +def proxify_device_object_python_collection( + seq, proxied_id_to_proxy, found_proxies, excl_proxies +): + return type(seq)( + dispatch(o, proxied_id_to_proxy, found_proxies, excl_proxies) for o in seq + ) @dispatch.register(dict) -def proxify_device_object_python_dict(seq, proxied_id_to_proxy, found_proxies): - return {k: dispatch(v, proxied_id_to_proxy, found_proxies) for k, v in seq.items()} +def proxify_device_object_python_dict( + seq, proxied_id_to_proxy, found_proxies, excl_proxies +): + return { + k: dispatch(v, proxied_id_to_proxy, found_proxies, excl_proxies) + for k, v in seq.items() + } # Implement cuDF specific proxification @@ -118,7 +135,9 @@ class FrameProxyObject(ProxyObject, cudf._lib.table.Table): @dispatch.register(cudf.DataFrame) @dispatch.register(cudf.Series) @dispatch.register(cudf.Index) - def proxify_device_object_cudf_dataframe(obj, proxied_id_to_proxy, found_proxies): + def proxify_device_object_cudf_dataframe( + obj, proxied_id_to_proxy, found_proxies, excl_proxies + ): return proxify( obj, proxied_id_to_proxy, found_proxies, subclass=FrameProxyObject ) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 49011f012..5b706a55d 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -182,10 +182,13 @@ def get_access_info(self) -> Tuple[int, List[Tuple[int, int, List[ProxyObject]]] return total_dev_mem_usage, dev_buf_access def add_external(self, obj): - # Notice, since `self.store` isn't modified no lock is needed + # Notice, since `self.store` isn't modified, no lock is needed found_proxies: List[ProxyObject] = [] - proxied_id_to_proxy = self.proxies_tally.get_proxied_id_to_proxy() - ret = proxify_device_objects(obj, proxied_id_to_proxy, found_proxies) + proxied_id_to_proxy = {} + # Notice, we are excluding found objects that are already proxies + ret = proxify_device_objects( + obj, proxied_id_to_proxy, found_proxies, excl_proxies=True + ) last_access = time.time() self_weakref = weakref.ref(self) for p in found_proxies: From d440329c92f316d54e28ed409e41d2db85b202d2 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 19 Feb 2021 15:39:07 +0100 Subject: [PATCH 08/12] ProxyObject.__del__(): calls `external_finalize()` In order to trigger the finalization ASAP --- dask_cuda/proxy_object.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 113489783..cfbc26eac 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -199,6 +199,12 @@ def __init__( self._obj_pxy_lock = threading.RLock() self._obj_pxy_cache = {} + def __del__(self): + """In order to call `external_finalize()` ASAP, we call it here""" + external_finalize = self._obj_pxy.get("external_finalize", None) + if external_finalize is not None: + external_finalize() + def _obj_pxy_get_init_args(self, include_obj=True): """Return the attributes needed to initialize a ProxyObject From bc672169f0a26ecdfe934749794212c35a33eb90 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 19 Feb 2021 15:40:39 +0100 Subject: [PATCH 09/12] explicit-comms shuffle: now register externals --- dask_cuda/explicit_comms/dataframe/shuffle.py | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 985bedd87..0e0f0d9a8 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -11,12 +11,13 @@ import dask.dataframe import distributed from dask.base import compute_as_if_collection, tokenize -from dask.dataframe.core import DataFrame, _concat +from dask.dataframe.core import DataFrame, _concat as dd_concat from dask.dataframe.shuffle import shuffle_group from dask.delayed import delayed from distributed import wait from distributed.protocol import nested_deserialize, to_serialize +from ...proxify_host_file import ProxifyHostFile from .. import comms @@ -46,6 +47,7 @@ def sort_in_parts( rank_to_out_part_ids: Dict[int, List[int]], ignore_index: bool, concat_dfs_of_same_output_partition: bool, + concat, ) -> Dict[int, List[List[DataFrame]]]: """ Sort the list of grouped dataframes in `in_parts` @@ -96,7 +98,7 @@ def sort_in_parts( for i in range(len(rank_to_out_parts_list[rank])): if len(rank_to_out_parts_list[rank][i]) > 1: rank_to_out_parts_list[rank][i] = [ - _concat( + concat( rank_to_out_parts_list[rank][i], ignore_index=ignore_index ) ] @@ -144,11 +146,30 @@ async def local_shuffle( eps = s["eps"] assert s["rank"] in workers + try: + hostfile = first(iter(in_parts[0].values()))._obj_pxy.get( + "hostfile", lambda: None + )() + except AttributeError: + hostfile = None + + if isinstance(hostfile, ProxifyHostFile): + + def concat(args, ignore_index=False): + if len(args) < 2: + return args[0] + + return hostfile.add_external(dd_concat(args, ignore_index=ignore_index)) + + else: + concat = dd_concat + rank_to_out_parts_list = sort_in_parts( in_parts, rank_to_out_part_ids, ignore_index, concat_dfs_of_same_output_partition=True, + concat=concat, ) # Communicate all the dataframe-partitions all-to-all. The result is @@ -176,7 +197,7 @@ async def local_shuffle( dfs.extend(rank_to_out_parts_list[myrank][i]) rank_to_out_parts_list[myrank][i] = None if len(dfs) > 1: - ret.append(_concat(dfs, ignore_index=ignore_index)) + ret.append(concat(dfs, ignore_index=ignore_index)) else: ret.append(dfs[0]) return ret From c6321bd2c293cb8a70b38ac530a1a6785664eaa1 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 23 Feb 2021 10:35:25 +0100 Subject: [PATCH 10/12] add_external(): added docstring --- dask_cuda/proxify_host_file.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index 5b706a55d..e202d169f 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -182,6 +182,24 @@ def get_access_info(self) -> Tuple[int, List[Tuple[int, int, List[ProxyObject]]] return total_dev_mem_usage, dev_buf_access def add_external(self, obj): + """Add an external object to the hostfile that count against the + device_memory_limit but isn't part of the store. + + Normally, we use __setitem__ to store objects in the hostfile and make it + count against the device_memory_limit with the inherent consequence that + the objects are not freeable before subsequential calls to __delitem__. + This is a problem for long running tasks that want objects to count against + the device_memory_limit while freeing them ASAP without explicit calls to + __delitem__. + + Developer Notes + --------------- + In order to avoid holding references to the found proxies in `obj`, we + wrap them in `weakref.proxy(p)` and adds them to the `proxies_tally`. + In order to remove them from the `proxies_tally` again, we attach a + finalize(p) on the wrapped proxies that calls del_external(). + """ + # Notice, since `self.store` isn't modified, no lock is needed found_proxies: List[ProxyObject] = [] proxied_id_to_proxy = {} From 8601825667f8218bc55121acac47c4472ec94bc0 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 23 Feb 2021 11:15:32 +0100 Subject: [PATCH 11/12] Use time.monotonic() --- dask_cuda/proxify_host_file.py | 4 ++-- dask_cuda/proxy_object.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index e202d169f..477f94b5e 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -207,7 +207,7 @@ def add_external(self, obj): ret = proxify_device_objects( obj, proxied_id_to_proxy, found_proxies, excl_proxies=True ) - last_access = time.time() + last_access = time.monotonic() self_weakref = weakref.ref(self) for p in found_proxies: name = id(p) @@ -235,7 +235,7 @@ def __setitem__(self, key, value): self.store[key] = proxify_device_objects( value, proxied_id_to_proxy, found_proxies ) - last_access = time.time() + last_access = time.monotonic() self_weakref = weakref.ref(self) for p in found_proxies: p._obj_pxy["hostfile"] = self_weakref diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index cfbc26eac..206677d8d 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -321,7 +321,7 @@ def _obj_pxy_deserialize(self): external = self._obj_pxy.get("external", self) hostfile.proxies_tally.unspill_proxy(external) - self._obj_pxy["last_access"] = time.time() + self._obj_pxy["last_access"] = time.monotonic() return self._obj_pxy["obj"] def _obj_pxy_is_cuda_object(self) -> bool: From d8f371572dd38997d909acef2d51a406a23f4824 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 23 Feb 2021 11:23:56 +0100 Subject: [PATCH 12/12] tests: defined `one_item_array` and `one_item_nbytes` --- dask_cuda/tests/test_proxify_host_file.py | 41 ++++++++++++----------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 7bff3dca3..7e3053a2e 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -12,13 +12,14 @@ cupy = pytest.importorskip("cupy") cupy.cuda.set_allocator(None) -itemsize = cupy.arange(1).nbytes +one_item_array = lambda: cupy.arange(1) +one_item_nbytes = one_item_array().nbytes def test_one_item_limit(): - dhf = ProxifyHostFile(device_memory_limit=itemsize) - dhf["k1"] = cupy.arange(1) + 1 - dhf["k2"] = cupy.arange(1) + 2 + dhf = ProxifyHostFile(device_memory_limit=one_item_nbytes) + dhf["k1"] = one_item_array() + 42 + dhf["k2"] = one_item_array() # Check k1 is spilled because of the newer k2 k1 = dhf["k1"] @@ -28,7 +29,7 @@ def test_one_item_limit(): # Accessing k1 spills k2 and unspill k1 k1_val = k1[0] - assert k1_val == 1 + assert k1_val == 42 assert k2._obj_pxy_is_serialized() # Duplicate arrays changes nothing @@ -37,7 +38,7 @@ def test_one_item_limit(): assert k2._obj_pxy_is_serialized() # Adding a new array spills k1 and k2 - dhf["k4"] = cupy.arange(1) + 4 + dhf["k4"] = one_item_array() assert k1._obj_pxy_is_serialized() assert k2._obj_pxy_is_serialized() assert not dhf["k4"]._obj_pxy_is_serialized() @@ -50,11 +51,11 @@ def test_one_item_limit(): # Deleting k2 does not change anything since k3 still holds a # reference to the underlying proxy object - assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + assert dhf.proxies_tally.get_dev_mem_usage() == one_item_nbytes p1 = list(dhf.proxies_tally.get_unspilled_proxies()) assert len(p1) == 1 del dhf["k2"] - assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + assert dhf.proxies_tally.get_dev_mem_usage() == one_item_nbytes p2 = list(dhf.proxies_tally.get_unspilled_proxies()) assert len(p2) == 1 assert p1[0] is p2[0] @@ -114,7 +115,7 @@ def test_dataframes_share_dev_mem(): assert not v1._obj_pxy_is_serialized() assert not v2._obj_pxy_is_serialized() # Now the device_memory_limit is exceeded, which should evict both dataframes - dhf["k1"] = cupy.arange(1) + dhf["k1"] = one_item_array() assert v1._obj_pxy_is_serialized() assert v2._obj_pxy_is_serialized() @@ -132,10 +133,10 @@ def test_cudf_get_device_memory_objects(): def test_externals(): - dhf = ProxifyHostFile(device_memory_limit=itemsize) - dhf["k1"] = cupy.arange(1) + 1 + dhf = ProxifyHostFile(device_memory_limit=one_item_nbytes) + dhf["k1"] = one_item_array() k1 = dhf["k1"] - k2 = dhf.add_external(cupy.arange(1) + 1) + k2 = dhf.add_external(one_item_array()) # `k2` isn't part of the store but still triggers spilling of `k1` assert len(dhf) == 1 assert k1._obj_pxy_is_serialized() @@ -146,7 +147,7 @@ def test_externals(): k2[0] # Trigger spilling of `k1` assert k1._obj_pxy_is_serialized() assert not k2._obj_pxy_is_serialized() - assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + assert dhf.proxies_tally.get_dev_mem_usage() == one_item_nbytes # Removing `k2` also removes it from the tally del k2 assert dhf.proxies_tally.get_dev_mem_usage() == 0 @@ -154,8 +155,8 @@ def test_externals(): def test_externals_setitem(): - dhf = ProxifyHostFile(device_memory_limit=itemsize) - k1 = dhf.add_external(cupy.arange(1)) + dhf = ProxifyHostFile(device_memory_limit=one_item_nbytes) + k1 = dhf.add_external(one_item_array()) assert type(k1) is dask_cuda.proxy_object.ProxyObject assert len(dhf) == 0 assert "external" in k1._obj_pxy @@ -167,7 +168,7 @@ def test_externals_setitem(): assert "external" not in k1._obj_pxy assert "external_finalize" not in k1._obj_pxy - k1 = dhf.add_external(cupy.arange(1)) + k1 = dhf.add_external(one_item_array()) k1._obj_pxy_serialize(serializers=("dask", "pickle")) dhf["k1"] = k1 k1 = dhf["k1"] @@ -176,12 +177,12 @@ def test_externals_setitem(): assert "external" not in k1._obj_pxy assert "external_finalize" not in k1._obj_pxy - dhf["k1"] = cupy.arange(1) + dhf["k1"] = one_item_array() assert len(dhf.proxies_tally.proxy_id_to_proxy) == 1 - assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + assert dhf.proxies_tally.get_dev_mem_usage() == one_item_nbytes k1 = dhf.add_external(k1) assert len(dhf.proxies_tally.proxy_id_to_proxy) == 1 - assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + assert dhf.proxies_tally.get_dev_mem_usage() == one_item_nbytes k1 = dhf.add_external(dhf["k1"]) assert len(dhf.proxies_tally.proxy_id_to_proxy) == 1 - assert dhf.proxies_tally.get_dev_mem_usage() == itemsize + assert dhf.proxies_tally.get_dev_mem_usage() == one_item_nbytes