From 3439b83091c78169e4ed5145fc594e8b42a5c3d1 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 9 Apr 2021 13:20:01 -0400 Subject: [PATCH 1/2] Remove `--death-timeout` from option from `dask-cuda-worker` (#563) This removes `death_timeout` from the `dask-cuda-worker` CLI and `CUDAWorker`, as it previously was not being used for anything. I think we could get it working by passing its value on to the nanny processes being spawned by `CUDAWorker`, but @pentschev suggested it might be better to remove it since its lack of functionality hasn't really been a problem up until now. Authors: - Charles Blackmon-Luca (https://github.com/charlesbluca) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/563 --- dask_cuda/cli/dask_cuda_worker.py | 8 -------- dask_cuda/cuda_worker.py | 1 - dask_cuda/tests/test_spill.py | 2 -- 3 files changed, 11 deletions(-) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index aac203da7..1e2991a96 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -144,12 +144,6 @@ help="Filename to JSON encoded scheduler information. " "Use with dask-scheduler --scheduler-file", ) -@click.option( - "--death-timeout", - type=str, - default=None, - help="Seconds to wait for a scheduler before closing", -) @click.option( "--dashboard-prefix", type=str, default=None, help="Prefix for the Dashboard" ) @@ -225,7 +219,6 @@ def main( local_directory, scheduler_file, interface, - death_timeout, preload, dashboard_prefix, tls_ca_file, @@ -270,7 +263,6 @@ def main( local_directory, scheduler_file, interface, - death_timeout, preload, dashboard_prefix, security, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 38f2dd6e7..cd452a648 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -64,7 +64,6 @@ def __init__( local_directory=None, scheduler_file=None, interface=None, - death_timeout=None, preload=[], dashboard_prefix=None, security=None, diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 95e161468..d15704078 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -198,7 +198,6 @@ async def test_cupy_cluster_device_spill(params): silence_logs=False, dashboard_address=None, asynchronous=True, - death_timeout=60, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], memory_target_fraction=params["host_target"], @@ -365,7 +364,6 @@ async def test_cudf_cluster_device_spill(params): memory_target_fraction=params["host_target"], memory_spill_fraction=params["host_spill"], memory_pause_fraction=params["host_pause"], - death_timeout=60, asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: From 1526017749ae0a3137c40605bb992d426d9fee3b Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 12 Apr 2021 10:43:32 +0200 Subject: [PATCH 2/2] Explicit-comms: lock workers (#523) This PR uses https://github.com/dask/distributed/pull/4503 to support locking of workers before submitting an explicit-comms job. - [x] Implement `CommsContext.run(..., lock_workers=False)` - [x] Implement tests Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/523 --- dask_cuda/explicit_comms/comms.py | 59 ++++++++++++++++++++------ dask_cuda/tests/test_explicit_comms.py | 58 +++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/dask_cuda/explicit_comms/comms.py b/dask_cuda/explicit_comms/comms.py index df5e46f32..1de033e32 100644 --- a/dask_cuda/explicit_comms/comms.py +++ b/dask_cuda/explicit_comms/comms.py @@ -1,5 +1,6 @@ import asyncio import concurrent.futures +import contextlib import time import uuid from typing import List, Optional @@ -11,6 +12,33 @@ _default_comms = None +def get_multi_lock_or_null_context(multi_lock_context, *args, **kwargs): + """Return either a MultiLock or a NULL context + + Parameters + ---------- + multi_lock_context: bool + If True return MultiLock context else return a NULL context that + doesn't do anything + + *args, **kwargs: + Arguments parsed to the MultiLock creation + + Returns + ------- + context: context + Either `MultiLock(*args, **kwargs)` or a NULL context + """ + if multi_lock_context: + from distributed import MultiLock + + return MultiLock(*args, **kwargs) + else: + # Use a null context that doesn't do anything + # TODO: use `contextlib.nullcontext()` from Python 3.7+ + return contextlib.suppress() + + def default_comms(client: Optional[Client] = None) -> "CommsContext": """Return the default comms object @@ -194,7 +222,7 @@ def submit(self, worker, coroutine, *args, wait=False): ) return ret.result() if wait else ret - def run(self, coroutine, *args, workers=None): + def run(self, coroutine, *args, workers=None, lock_workers=False): """Run a coroutine on multiple workers The coroutine is given the worker's state dict as the first argument @@ -208,6 +236,9 @@ def run(self, coroutine, *args, workers=None): Arguments for `coroutine` workers: list, optional List of workers. Default is all workers + lock_workers: bool, optional + Use distributed.MultiLock to get exclusive access to the workers. Use + this flag to support parallel runs. Returns ------- @@ -216,16 +247,18 @@ def run(self, coroutine, *args, workers=None): """ if workers is None: workers = self.worker_addresses - ret = [] - for worker in workers: - ret.append( - self.client.submit( - _run_coroutine_on_worker, - self.sessionId, - coroutine, - args, - workers=[worker], - pure=False, + + with get_multi_lock_or_null_context(lock_workers, workers): + ret = [] + for worker in workers: + ret.append( + self.client.submit( + _run_coroutine_on_worker, + self.sessionId, + coroutine, + args, + workers=[worker], + pure=False, + ) ) - ) - return self.client.gather(ret) + return self.client.gather(ret) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 87beefb67..0f7f9c14e 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -1,3 +1,4 @@ +import asyncio import multiprocessing as mp import numpy as np @@ -7,7 +8,7 @@ import dask from dask import dataframe as dd from dask.dataframe.shuffle import partitioning_index -from distributed import Client +from distributed import Client, get_worker from distributed.deploy.local import LocalCluster import dask_cuda @@ -22,8 +23,8 @@ # that UCX options of the different tests doesn't conflict. -async def my_rank(state): - return state["rank"] +async def my_rank(state, arg): + return state["rank"] + arg def _test_local_cluster(protocol): @@ -36,7 +37,7 @@ def _test_local_cluster(protocol): ) as cluster: with Client(cluster) as client: c = comms.CommsContext(client) - assert sum(c.run(my_rank)) == sum(range(4)) + assert sum(c.run(my_rank, 0)) == sum(range(4)) @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) @@ -290,3 +291,52 @@ def test_jit_unspill(protocol): p.start() p.join() assert not p.exitcode + + +def _test_lock_workers(scheduler_address, ranks): + async def f(_): + worker = get_worker() + if hasattr(worker, "running"): + assert not worker.running + worker.running = True + await asyncio.sleep(0.5) + assert worker.running + worker.running = False + + with Client(scheduler_address) as client: + c = comms.CommsContext(client) + c.run(f, workers=[c.worker_addresses[r] for r in ranks], lock_workers=True) + + +def test_lock_workers(): + """ + Testing `run(...,lock_workers=True)` by spawning 30 runs with overlapping + and non-overlapping worker sets. + """ + try: + from distributed import MultiLock # noqa F401 + except ImportError as e: + pytest.skip(str(e)) + + with LocalCluster( + protocol="tcp", + dashboard_address=None, + n_workers=4, + threads_per_worker=5, + processes=True, + ) as cluster: + ps = [] + for _ in range(5): + for ranks in [[0, 1], [1, 3], [2, 3]]: + ps.append( + mp.Process( + target=_test_lock_workers, + args=(cluster.scheduler_address, ranks), + ) + ) + ps[-1].start() + + for p in ps: + p.join() + + assert all(p.exitcode == 0 for p in ps)