From cfac32ec319ee8298a673b9eebd06d06d9e49d01 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 10 Mar 2022 13:22:37 -0800 Subject: [PATCH 1/2] Implement `DeviceHostFile.evict()` --- dask_cuda/device_host_file.py | 10 ++++++++++ dask_cuda/tests/test_spill.py | 24 ++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index f89a713aa..ae8e53de7 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -260,6 +260,16 @@ def __delitem__(self, key): self.device_keys.discard(key) del self.device_buffer[key] + def evict(self): + """Evicts least recently used host buffer (aka, CPU or system memory) + + Implements distributed.spill.ManualEvictProto interface""" + try: + _, _, weight = self.host_buffer.fast.evict() + return weight + except Exception: # We catch all `Exception`s, just like zict.LRU + return -1 + def set_address(self, addr): if isinstance(self.host_buffer, LoggedBuffer): self.host_buffer.set_address(addr) diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 50e530a4f..9f05be395 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -84,7 +84,7 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov [ { "device_memory_limit": int(200e6), - "memory_limit": int(800e6), + "memory_limit": int(2000e6), "host_target": False, "host_spill": False, "host_pause": False, @@ -98,6 +98,16 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov "host_pause": False, "spills_to_disk": True, }, + { + # This test setup differs from the one above as Distributed worker + # pausing is enabled and thus triggers `DeviceHostFile.evict()` + "device_memory_limit": int(200e6), + "memory_limit": int(1000e6), + "host_target": None, + "host_spill": None, + "host_pause": None, + "spills_to_disk": True, + }, { "device_memory_limit": int(200e6), "memory_limit": None, @@ -159,7 +169,7 @@ async def test_cupy_cluster_device_spill(params): [ { "device_memory_limit": int(200e6), - "memory_limit": int(800e6), + "memory_limit": int(4000e6), "host_target": False, "host_spill": False, "host_pause": False, @@ -173,6 +183,16 @@ async def test_cupy_cluster_device_spill(params): "host_pause": False, "spills_to_disk": True, }, + { + # This test setup differs from the one above as Distributed worker + # pausing is enabled and thus triggers `DeviceHostFile.evict()` + "device_memory_limit": int(200e6), + "memory_limit": int(2000e6), + "host_target": None, + "host_spill": None, + "host_pause": None, + "spills_to_disk": True, + }, { "device_memory_limit": int(200e6), "memory_limit": None, From 7c9d6fcbb68bb854850f09ee4d698142afc64282 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 14 Mar 2022 13:53:09 -0700 Subject: [PATCH 2/2] Add timeouts and prevent test_spill from pausing --- dask_cuda/tests/test_spill.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 9f05be395..9325d6f43 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -105,7 +105,7 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov "memory_limit": int(1000e6), "host_target": None, "host_spill": None, - "host_pause": None, + "host_pause": False, "spills_to_disk": True, }, { @@ -119,6 +119,7 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov ], ) @pytest.mark.asyncio +@gen_test(timeout=20) async def test_cupy_cluster_device_spill(params): cupy = pytest.importorskip("cupy") with dask.config.set({"distributed.worker.memory.terminate": False}): @@ -190,7 +191,7 @@ async def test_cupy_cluster_device_spill(params): "memory_limit": int(2000e6), "host_target": None, "host_spill": None, - "host_pause": None, + "host_pause": False, "spills_to_disk": True, }, { @@ -204,6 +205,7 @@ async def test_cupy_cluster_device_spill(params): ], ) @pytest.mark.asyncio +@gen_test(timeout=20) async def test_cudf_cluster_device_spill(params): cudf = pytest.importorskip("cudf")