Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock and simplify proxy tracking #712

Merged
merged 27 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ce4a260
clean up
madsbk Aug 20, 2021
617a91a
mypy: type hints
madsbk Aug 20, 2021
dab3964
ProxyObject: clean up
madsbk Aug 20, 2021
c932100
DeviceHostFile: clean up local_directory argument
madsbk Aug 20, 2021
ed19cb8
clean up
madsbk Aug 20, 2021
e7975d1
Clean up
madsbk Aug 20, 2021
716a179
More type hints
madsbk Aug 26, 2021
660ca0f
Disallowing proxy of collections
madsbk Aug 26, 2021
871ac97
Now tracking a proxy's actual serializer
madsbk Aug 26, 2021
d98db98
Re-implemented proxy tracking
madsbk Aug 27, 2021
1406076
get_device_memory_objects(): now incl. serialized data
madsbk Sep 6, 2021
d8136dc
ProxyObject: call the new finalizer
madsbk Sep 6, 2021
704037f
proxify(): remove the external finalizer
madsbk Sep 6, 2021
252e717
Fixed _mem_usage typo
madsbk Sep 6, 2021
2d7f030
Tracking now passes all tests
madsbk Sep 6, 2021
531932f
Some docs
madsbk Sep 6, 2021
7d5ab18
Minor clean up
madsbk Sep 6, 2021
c15c394
minor doc clean up
madsbk Sep 7, 2021
4cc1a3e
Proxies now use use the manager's lock
madsbk Sep 7, 2021
19587c3
Docs and clean up
madsbk Sep 7, 2021
3b614fe
mem_usage_add(): proxy can have an empty set of dev buffers
madsbk Sep 7, 2021
a70e664
Proxies.remove(): raise warning when the tally isn't going to zero
madsbk Sep 7, 2021
451d752
now using the proxy's dtor instead of a finalizer
madsbk Sep 7, 2021
23c8b4b
Implements get_proxies_by_serializer()
madsbk Sep 7, 2021
a4fdb60
removed evict()
madsbk Sep 7, 2021
5c4ca1a
Clean up by @pentschev
madsbk Sep 8, 2021
4225364
Renamed mem_usage_sub => mem_usage_remove
madsbk Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,12 @@ def __init__(
local_directory=None,
log_spilling=False,
):
if local_directory is None:
local_directory = dask.config.get("temporary-directory") or os.getcwd()

if local_directory and not os.path.exists(local_directory):
os.makedirs(local_directory, exist_ok=True)
local_directory = os.path.join(local_directory, "dask-worker-space")

self.disk_func_path = os.path.join(local_directory, "storage")
self.disk_func_path = os.path.join(
local_directory or dask.config.get("temporary-directory") or os.getcwd(),
"dask-worker-space",
"storage",
)
os.makedirs(self.disk_func_path, exist_ok=True)

self.host_func = dict()
self.disk_func = Func(
Expand Down
12 changes: 5 additions & 7 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from distributed import wait
from distributed.protocol import nested_deserialize, to_serialize

from ...proxify_host_file import ProxifyHostFile
from ...proxify_host_file import ProxyManager
from .. import comms


Expand Down Expand Up @@ -148,19 +148,17 @@ async def local_shuffle(
eps = s["eps"]

try:
hostfile = first(iter(in_parts[0].values()))._obj_pxy.get(
"hostfile", lambda: None
)()
manager = first(iter(in_parts[0].values()))._obj_pxy.get("manager", None)
except AttributeError:
hostfile = None
manager = None

if isinstance(hostfile, ProxifyHostFile):
if isinstance(manager, ProxyManager):

def concat(args, ignore_index=False):
if len(args) < 2:
return args[0]

return hostfile.add_external(dd_concat(args, ignore_index=ignore_index))
return manager.proxify(dd_concat(args, ignore_index=ignore_index))

else:
concat = dd_concat
Expand Down
5 changes: 1 addition & 4 deletions dask_cuda/get_device_memory_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ def get_device_memory_objects(obj) -> set:
@dispatch.register(object)
def get_device_memory_objects_default(obj):
if hasattr(obj, "_obj_pxy"):
if obj._obj_pxy["serializers"] is None:
return dispatch(obj._obj_pxy["obj"])
else:
return []
return dispatch(obj._obj_pxy["obj"])
if hasattr(obj, "data"):
return dispatch(obj.data)
if hasattr(obj, "_owner") and obj._owner is not None:
Expand Down
4 changes: 1 addition & 3 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ def __init__(
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.host_memory_limit,
"local_directory": local_directory
or dask.config.get("temporary-directory")
or os.getcwd(),
"local_directory": local_directory,
"log_spilling": log_spilling,
},
)
Expand Down
24 changes: 3 additions & 21 deletions dask_cuda/proxify_device_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,9 @@ def wrapper(*args, **kwargs):

def proxify(obj, proxied_id_to_proxy, found_proxies, subclass=None):
_id = id(obj)
if _id in proxied_id_to_proxy:
ret = proxied_id_to_proxy[_id]
finalize = ret._obj_pxy.get("external_finalize", None)
if finalize:
finalize()
proxied_id_to_proxy[_id] = ret = asproxy(obj, subclass=subclass)
else:
proxied_id_to_proxy[_id] = ret = asproxy(obj, subclass=subclass)
if _id not in proxied_id_to_proxy:
proxied_id_to_proxy[_id] = asproxy(obj, subclass=subclass)
ret = proxied_id_to_proxy[_id]
found_proxies.append(ret)
return ret

Expand All @@ -190,11 +185,6 @@ def proxify_device_object_default(
def proxify_device_object_proxy_object(
obj, proxied_id_to_proxy, found_proxies, excl_proxies
):
# We deserialize CUDA-serialized objects since it is very cheap and
# makes it easy to administrate device memory usage
if obj._obj_pxy_is_serialized() and "cuda" in obj._obj_pxy["serializers"]:
obj._obj_pxy_deserialize()

# Check if `obj` is already known
if not obj._obj_pxy_is_serialized():
_id = id(obj._obj_pxy["obj"])
Expand All @@ -203,14 +193,6 @@ def proxify_device_object_proxy_object(
else:
proxied_id_to_proxy[_id] = obj

finalize = obj._obj_pxy.get("external_finalize", None)
if finalize:
finalize()
obj = obj._obj_pxy_copy()
if not obj._obj_pxy_is_serialized():
_id = id(obj._obj_pxy["obj"])
proxied_id_to_proxy[_id] = obj

if not excl_proxies:
found_proxies.append(obj)
return obj
Expand Down
Loading