diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d5cf682b9..4747bca4e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - PR #103: Epsilon parameter for Cholesky rank one update - PR #100: Add divyegala as codeowner - PR #111: Cleanup gpuCI scripts +- PR #120: Update NCCL init process to support root node placement. ## Bug Fixes - PR #106: Specify dependency branches to avoid pip resolver failure diff --git a/python/raft/dask/common/comms.py b/python/raft/dask/common/comms.py index 4278783968..27533dfb9a 100644 --- a/python/raft/dask/common/comms.py +++ b/python/raft/dask/common/comms.py @@ -26,10 +26,13 @@ import warnings +import logging import time import uuid from collections import OrderedDict +logger = logging.getLogger(__name__) + class Comms: @@ -77,8 +80,16 @@ def _use_comms(sessionId): cluster.close() """ - def __init__(self, comms_p2p=False, client=None, verbose=False, - streams_per_handle=0): + valid_nccl_placements = ("client", "worker", "scheduler") + + def __init__( + self, + comms_p2p=False, + client=None, + verbose=False, + streams_per_handle=0, + nccl_root_location="scheduler", + ): """ Construct a new CommsContext instance @@ -90,10 +101,22 @@ def __init__(self, comms_p2p=False, client=None, verbose=False, Dask client to use verbose : bool Print verbose logging + nccl_root_location : string + Indicates where the NCCL's root node should be located. + ['client', 'worker', 'scheduler' (default)] + """ self.client = client if client is not None else default_client() + self.comms_p2p = comms_p2p + self.nccl_root_location = nccl_root_location.lower() + if self.nccl_root_location not in Comms.valid_nccl_placements: + raise ValueError( + f"nccl_root_location must be one of: " + f"{Comms.valid_nccl_placements}" + ) + self.streams_per_handle = streams_per_handle self.sessionId = uuid.uuid4().bytes @@ -110,14 +133,33 @@ def __del__(self): if self.nccl_initialized or self.ucx_initialized: self.destroy() + def create_nccl_uniqueid(self): + if self.nccl_root_location == "client": + self.uniqueId = nccl.get_unique_id() + elif self.nccl_root_location == "worker": + self.uniqueId = self.client.run( + _func_set_worker_as_nccl_root, + sessionId=self.sessionId, + verbose=self.verbose, + workers=[self.worker_addresses[0]], + wait=True, + )[self.worker_addresses[0]] + else: + self.uniqueId = self.client.run_on_scheduler( + _func_set_scheduler_as_nccl_root, + sessionId=self.sessionId, + verbose=self.verbose, + ) + def worker_info(self, workers): """ Builds a dictionary of { (worker_address, worker_port) : (worker_rank, worker_port ) } """ ranks = _func_worker_ranks(workers) - ports = _func_ucp_ports(self.client, workers) \ - if self.comms_p2p else None + ports = ( + _func_ucp_ports(self.client, workers) if self.comms_p2p else None + ) output = {} for k in ranks.keys(): @@ -138,9 +180,13 @@ def init(self, workers=None): Unique collection of workers for initializing comms. """ - self.worker_addresses = list(OrderedDict.fromkeys( - self.client.scheduler_info()["workers"].keys() - if workers is None else workers)) + self.worker_addresses = list( + OrderedDict.fromkeys( + self.client.scheduler_info()["workers"].keys() + if workers is None + else workers + ) + ) if self.nccl_initialized or self.ucx_initialized: warnings.warn("Comms have already been initialized.") @@ -149,17 +195,19 @@ def init(self, workers=None): worker_info = self.worker_info(self.worker_addresses) worker_info = {w: worker_info[w] for w in self.worker_addresses} - self.uniqueId = nccl.get_unique_id() + self.create_nccl_uniqueid() - self.client.run(_func_init_all, - self.sessionId, - self.uniqueId, - self.comms_p2p, - worker_info, - self.verbose, - self.streams_per_handle, - workers=self.worker_addresses, - wait=True) + self.client.run( + _func_init_all, + self.sessionId, + self.uniqueId, + self.comms_p2p, + worker_info, + self.verbose, + self.streams_per_handle, + workers=self.worker_addresses, + wait=True, + ) self.nccl_initialized = True @@ -175,12 +223,19 @@ def destroy(self): be called automatically by the Comms destructor, but may be called earlier to save resources. """ - self.client.run(_func_destroy_all, - self.sessionId, - self.comms_p2p, - self.verbose, - wait=True, - workers=self.worker_addresses) + self.client.run( + _func_destroy_all, + self.sessionId, + self.comms_p2p, + self.verbose, + wait=True, + workers=self.worker_addresses, + ) + + if self.nccl_root_location == "scheduler": + self.client.run_on_scheduler( + _func_destroy_scheduler_session, self.sessionId + ) if self.verbose: print("Destroying comms.") @@ -190,7 +245,8 @@ def destroy(self): def local_handle(sessionId): - """Simple helper function for retrieving the local handle_t instance + """ + Simple helper function for retrieving the local handle_t instance for a comms session on a worker. Parameters @@ -203,32 +259,57 @@ def local_handle(sessionId): handle : raft.Handle or None """ - state = worker_state(sessionId) + state = get_raft_comm_state(sessionId, get_worker()) return state["handle"] if "handle" in state else None -def worker_state(sessionId=None): +def get_raft_comm_state(sessionId, state_object=None): """ - Retrieves cuML comms state on local worker for the given - sessionId, creating a new session if it does not exist. - If no session id is given, returns the state dict for all - sessions. + Retrieves cuML comms state on the scheduler node, for the given sessionId, + creating a new session if it does not exist. If no session id is given, + returns the state dict for all sessions. Parameters ---------- - sessionId : str - session identifier from initialized comms instance + sessionId : SessionId value to retrieve from the dask_scheduler instances + state_object : Object (either Worker, or Scheduler) on which the raft + comm state will retrieved (or created) + + Returns + ------- + + session state : str + session state associated with sessionId """ - worker = get_worker() - if not hasattr(worker, "_raft_comm_state"): - worker._raft_comm_state = {} - if sessionId is not None and sessionId not in worker._raft_comm_state: - # Build state for new session and mark session creation time - worker._raft_comm_state[sessionId] = {"ts": time.time()} + state_object = state_object if state_object is not None else get_worker() + + if not hasattr(state_object, "_raft_comm_state"): + state_object._raft_comm_state = {} + + if ( + sessionId is not None + and sessionId not in state_object._raft_comm_state + ): + state_object._raft_comm_state[sessionId] = {"ts": time.time()} if sessionId is not None: - return worker._raft_comm_state[sessionId] - return worker._raft_comm_state + return state_object._raft_comm_state[sessionId] + + return state_object._raft_comm_state + + +def set_nccl_root(sessionId, state_object): + if sessionId is None: + raise ValueError("sessionId cannot be None.") + + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=state_object + ) + + if "nccl_uid" not in raft_comm_state: + raft_comm_state["nccl_uid"] = nccl.get_unique_id() + + return raft_comm_state["nccl_uid"] def get_ucx(): @@ -236,36 +317,126 @@ def get_ucx(): A simple convenience wrapper to make sure UCP listener and endpoints are only ever assigned once per worker. """ - if "ucx" not in worker_state("ucp"): - worker_state("ucp")["ucx"] = UCX.get() - return worker_state("ucp")["ucx"] + raft_comm_state = get_raft_comm_state( + sessionId="ucp", state_object=get_worker() + ) + if "ucx" not in raft_comm_state: + raft_comm_state["ucx"] = UCX.get() + + return raft_comm_state["ucx"] + + +def _func_destroy_scheduler_session(sessionId, dask_scheduler): + """ + Remove session date from _raft_comm_state, associated with sessionId + + Parameters + ---------- + sessionId : session Id to be destroyed. + dask_scheduler : dask_scheduler object + (Note: this is supplied by DASK, not the client) + """ + if sessionId is not None and sessionId in dask_scheduler._raft_comm_state: + del dask_scheduler._raft_comm_state[sessionId] + else: + return 1 + + return 0 + + +def _func_set_scheduler_as_nccl_root(sessionId, verbose, dask_scheduler): + """ + Creates a persistent nccl uniqueId on the scheduler node. + + + Parameters + ---------- + sessionId : Associated session to attach the unique ID to. + verbose : Indicates whether or not to emit additional information + dask_scheduler : dask scheduler object, + (Note: this is supplied by DASK, not the client) + + Return + ------ + uniqueId : byte str + NCCL uniqueId, associating the DASK scheduler as its root node. + """ + if verbose: + logger.info( + msg=f"Setting scheduler as NCCL " + f"root for sessionId, '{sessionId}'" + ) + + nccl_uid = set_nccl_root(sessionId=sessionId, state_object=dask_scheduler) + + if verbose: + logger.info("Done setting scheduler as NCCL root.") + + return nccl_uid + + +def _func_set_worker_as_nccl_root(sessionId, verbose): + """ + Creates a persistent nccl uniqueId on the scheduler node. + + + Parameters + ---------- + sessionId : Associated session to attach the unique ID to. + verbose : Indicates whether or not to emit additional information + + Return + ------ + uniqueId : byte str + NCCL uniqueId, associating this DASK worker as its root node. + """ + worker = get_worker() + if verbose: + worker.log_event( + topic="info", + msg=f"Setting worker as NCCL root for session, '{sessionId}'", + ) + + nccl_uid = set_nccl_root(sessionId=sessionId, state_object=worker) + + if verbose: + worker.log_event( + topic="info", msg="Done setting scheduler as NCCL root." + ) + + return nccl_uid def _func_ucp_listener_port(): return get_ucx().listener_port() -async def _func_init_all(sessionId, uniqueId, comms_p2p, - worker_info, verbose, streams_per_handle): - - session_state = worker_state(sessionId) - session_state["nccl_uid"] = uniqueId - session_state["wid"] = worker_info[get_worker().address]["rank"] - session_state["nworkers"] = len(worker_info) +async def _func_init_all( + sessionId, uniqueId, comms_p2p, worker_info, verbose, streams_per_handle +): + worker = get_worker() + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=worker + ) + raft_comm_state["nccl_uid"] = uniqueId + raft_comm_state["wid"] = worker_info[get_worker().address]["rank"] + raft_comm_state["nworkers"] = len(worker_info) if verbose: - print("Initializing NCCL") + worker.log_event(topic="info", msg="Initializing NCCL.") start = time.time() _func_init_nccl(sessionId, uniqueId) if verbose: elapsed = time.time() - start - print("NCCL Initialization took: %f seconds." % elapsed) + worker.log_event( + topic="info", msg=f"NCCL Initialization took: {elapsed} seconds." + ) if comms_p2p: if verbose: - print("Initializing UCX Endpoints") + worker.log_event(topic="info", msg="Initializing UCX Endpoints") if verbose: start = time.time() @@ -273,14 +444,16 @@ async def _func_init_all(sessionId, uniqueId, comms_p2p, if verbose: elapsed = time.time() - start - print("Done initializing UCX endpoints. Took: %f seconds." % - elapsed) - print("Building handle") + msg = ( + f"Done initializing UCX endpoints." + f"Took: {elapsed} seconds.\nBuilding handle." + ) + worker.log_event(topic="info", msg=msg) _func_build_handle_p2p(sessionId, streams_per_handle, verbose) if verbose: - print("Done building handle.") + worker.log_event(topic="info", msg="Done building handle.") else: _func_build_handle(sessionId, streams_per_handle, verbose) @@ -299,15 +472,22 @@ def _func_init_nccl(sessionId, uniqueId): client. """ - wid = worker_state(sessionId)["wid"] - nWorkers = worker_state(sessionId)["nworkers"] + worker = get_worker() + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=get_worker() + ) + wid = raft_comm_state["wid"] + nWorkers = raft_comm_state["nworkers"] try: n = nccl() n.init(nWorkers, uniqueId, wid) - worker_state(sessionId)["nccl"] = n - except Exception: - print("An error occurred initializing NCCL!") + raft_comm_state["nccl"] = n + except Exception as e: + worker.log_event( + topic="error", msg=f"An error occurred initializing NCCL: {e}." + ) + raise def _func_build_handle_p2p(sessionId, streams_per_handle, verbose): @@ -320,19 +500,34 @@ def _func_build_handle_p2p(sessionId, streams_per_handle, verbose): streams_per_handle : int number of internal streams to create verbose : bool print verbose logging output """ + worker = get_worker() + if verbose: + worker.log_event(topic="info", msg="Building p2p handle.") + ucp_worker = get_ucx().get_worker() - session_state = worker_state(sessionId) + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=worker + ) handle = Handle(streams_per_handle) - nccl_comm = session_state["nccl"] - eps = session_state["ucp_eps"] - nWorkers = session_state["nworkers"] - workerId = session_state["wid"] + nccl_comm = raft_comm_state["nccl"] + eps = raft_comm_state["ucp_eps"] + nWorkers = raft_comm_state["nworkers"] + workerId = raft_comm_state["wid"] + + if verbose: + worker.log_event(topic="info", msg="Injecting comms on handle.") - inject_comms_on_handle(handle, nccl_comm, ucp_worker, eps, - nWorkers, workerId, verbose) + inject_comms_on_handle( + handle, nccl_comm, ucp_worker, eps, nWorkers, workerId, verbose + ) - worker_state(sessionId)["handle"] = handle + if verbose: + worker.log_event( + topic="info", msg="Finished injecting comms on handle." + ) + + raft_comm_state["handle"] = handle def _func_build_handle(sessionId, streams_per_handle, verbose): @@ -345,24 +540,35 @@ def _func_build_handle(sessionId, streams_per_handle, verbose): streams_per_handle : int number of internal streams to create verbose : bool print verbose logging output """ + worker = get_worker() + if verbose: + worker.log_event( + topic="info", msg="Finished injecting comms on handle." + ) + handle = Handle(streams_per_handle) - session_state = worker_state(sessionId) + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=worker + ) - workerId = session_state["wid"] - nWorkers = session_state["nworkers"] + workerId = raft_comm_state["wid"] + nWorkers = raft_comm_state["nworkers"] - nccl_comm = session_state["nccl"] - inject_comms_on_handle_coll_only(handle, nccl_comm, nWorkers, - workerId, verbose) - session_state["handle"] = handle + nccl_comm = raft_comm_state["nccl"] + inject_comms_on_handle_coll_only( + handle, nccl_comm, nWorkers, workerId, verbose + ) + raft_comm_state["handle"] = handle def _func_store_initial_state(nworkers, sessionId, uniqueId, wid): - session_state = worker_state(sessionId) - session_state["nccl_uid"] = uniqueId - session_state["wid"] = wid - session_state["nworkers"] = nworkers + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=get_worker() + ) + raft_comm_state["nccl_uid"] = uniqueId + raft_comm_state["wid"] = wid + raft_comm_state["nworkers"] = nworkers async def _func_ucp_create_endpoints(sessionId, worker_info): @@ -387,18 +593,52 @@ async def _func_ucp_create_endpoints(sessionId, worker_info): eps[worker_info[k]["rank"]] = ep count += 1 - worker_state(sessionId)["ucp_eps"] = eps + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=get_worker() + ) + raft_comm_state["ucp_eps"] = eps async def _func_destroy_all(sessionId, comms_p2p, verbose=False): - worker_state(sessionId)["nccl"].destroy() - del worker_state(sessionId)["nccl"] - del worker_state(sessionId)["handle"] + worker = get_worker() + if verbose: + worker.log_event(topic="info", msg="Destroying NCCL session state.") + + raft_comm_state = get_raft_comm_state( + sessionId=sessionId, state_object=worker + ) + if "nccl" in raft_comm_state: + raft_comm_state["nccl"].destroy() + del raft_comm_state["nccl"] + if verbose: + worker.log_event(topic="info", msg="NCCL session state destroyed.") + else: + if verbose: + worker.log_event( + topic="warning", + msg=f"Session state for, '{sessionId}', " + f"does not contain expected 'nccl' element", + ) + + if verbose: + worker.log_event( + topic="info", + msg=f"Destroying CUDA handle for sessionId, '{sessionId}.'", + ) + + if "handle" in raft_comm_state: + del raft_comm_state["handle"] + else: + if verbose: + worker.log_event( + topic="warning", + msg=f"Session state for, '{sessionId}', " + f"does not contain expected 'handle' element", + ) def _func_ucp_ports(client, workers): - return client.run(_func_ucp_listener_port, - workers=workers) + return client.run(_func_ucp_listener_port, workers=workers) def _func_worker_ranks(workers): diff --git a/python/raft/dask/common/nccl.pyx b/python/raft/dask/common/nccl.pyx index d55a0e4c42..7fc813b515 100644 --- a/python/raft/dask/common/nccl.pyx +++ b/python/raft/dask/common/nccl.pyx @@ -140,18 +140,15 @@ cdef class nccl: cdef int r = rank cdef ncclResult_t result - import time - - start = time.time() with nogil: result = ncclCommInitRank(comm_, nr, deref(ident), r) - end = time.time() if result != ncclSuccess: with nogil: err_str = ncclGetErrorString(result) - print("NCCL_ERROR: %s" % err_str) + + raise RuntimeError("NCCL_ERROR: %s" % err_str) def destroy(self): """ @@ -164,13 +161,14 @@ cdef class nccl: with nogil: result = ncclCommDestroy(deref(comm_)) + free(self.comm) + self.comm = NULL + if result != ncclSuccess: with nogil: err_str = ncclGetErrorString(result) - print("NCCL_ERROR: %s" % err_str) - free(self.comm) - self.comm = NULL + raise RuntimeError("NCCL_ERROR: %s" % err_str) def abort(self): """ @@ -182,12 +180,13 @@ cdef class nccl: with nogil: result = ncclCommAbort(deref(comm_)) + free(comm_) + self.comm = NULL + if result != ncclSuccess: with nogil: err_str = ncclGetErrorString(result) - print("NCCL_ERROR: %s" % err_str) - free(comm_) - self.comm = NULL + raise RuntimeError("NCCL_ERROR: %s" % err_str) def cu_device(self): """ @@ -204,13 +203,15 @@ cdef class nccl: with nogil: result = ncclCommCuDevice(deref(comm_), dev) + ret = dev[0] + free(dev) + if result != ncclSuccess: with nogil: err_str = ncclGetErrorString(result) - print("NCCL_ERROR: %s" % err_str) - ret = dev[0] - free(dev) + raise RuntimeError("NCCL_ERROR: %s" % err_str) + return ret def user_rank(self): @@ -230,13 +231,14 @@ cdef class nccl: with nogil: result = ncclCommUserRank(deref(comm_), rank) + ret = rank[0] + free(rank) + if result != ncclSuccess: with nogil: err_str = ncclGetErrorString(result) - print("NCCL_ERROR: %s" % err_str) + raise RuntimeError("NCCL_ERROR: %s" % err_str) - ret = rank[0] - free(rank) return ret def get_comm(self): diff --git a/python/raft/test/test_comms.py b/python/raft/test/test_comms.py index 5dfe2243c0..7dccb7bbae 100644 --- a/python/raft/test/test_comms.py +++ b/python/raft/test/test_comms.py @@ -15,6 +15,8 @@ import pytest +from collections import OrderedDict + from dask.distributed import Client from dask.distributed import wait @@ -28,6 +30,7 @@ from raft.dask.common import perform_test_comms_allgather from raft.dask.common import perform_test_comms_reducescatter from raft.dask.common import perform_test_comm_split + pytestmark = pytest.mark.mg except ImportError: pytestmark = pytest.mark.skip @@ -65,6 +68,39 @@ def func_test_comm_split(sessionId, n_trials): return perform_test_comm_split(handle, n_trials) +def func_check_uid(sessionId, uniqueId, state_object): + if not hasattr(state_object, "_raft_comm_state"): + return 1 + + state = state_object._raft_comm_state + if sessionId not in state: + return 2 + + session_state = state[sessionId] + if "nccl_uid" not in session_state: + return 3 + + nccl_uid = session_state["nccl_uid"] + if nccl_uid != uniqueId: + return 4 + + return 0 + + +def func_check_uid_on_scheduler(sessionId, uniqueId, dask_scheduler): + return func_check_uid( + sessionId=sessionId, uniqueId=uniqueId, state_object=dask_scheduler + ) + + +def func_check_uid_on_worker(sessionId, uniqueId): + from dask.distributed import get_worker + + return func_check_uid( + sessionId=sessionId, uniqueId=uniqueId, state_object=get_worker() + ) + + def test_handles(cluster): client = Client(cluster) @@ -76,11 +112,10 @@ def _has_handle(sessionId): cb = Comms(verbose=True) cb.init() - dfs = [client.submit(_has_handle, - cb.sessionId, - pure=False, - workers=[w]) - for w in cb.worker_addresses] + dfs = [ + client.submit(_has_handle, cb.sessionId, pure=False, workers=[w]) + for w in cb.worker_addresses + ] wait(dfs, timeout=5) assert all(client.compute(dfs, sync=True)) @@ -90,35 +125,83 @@ def _has_handle(sessionId): client.close() -if pytestmark.markname != 'skip': - functions = [perform_test_comms_allgather, - perform_test_comms_allreduce, - perform_test_comms_bcast, - perform_test_comms_reduce, - perform_test_comms_reducescatter] +if pytestmark.markname != "skip": + functions = [ + perform_test_comms_allgather, + perform_test_comms_allreduce, + perform_test_comms_bcast, + perform_test_comms_reduce, + perform_test_comms_reducescatter, + ] else: functions = [None] -@pytest.mark.parametrize("func", functions) -@pytest.mark.nccl -def test_collectives(client, func): +@pytest.mark.parametrize("root_location", ["client", "worker", "scheduler"]) +def test_nccl_root_placement(client, root_location): - cb = Comms(verbose=True) - cb.init() + cb = None + try: + cb = Comms( + verbose=True, client=client, nccl_root_location=root_location + ) + cb.init() - for k, v in cb.worker_info(cb.worker_addresses).items(): + worker_addresses = list( + OrderedDict.fromkeys(client.scheduler_info()["workers"].keys()) + ) + + if root_location in ("worker",): + result = client.run( + func_check_uid_on_worker, + cb.sessionId, + cb.uniqueId, + workers=[worker_addresses[0]], + )[worker_addresses[0]] + elif root_location in ("scheduler",): + result = client.run_on_scheduler( + func_check_uid_on_scheduler, cb.sessionId, cb.uniqueId + ) + else: + result = int(cb.uniqueId is None) + + assert result == 0 + + finally: + if cb: + cb.destroy() - dfs = [client.submit(func_test_collective, - func, - cb.sessionId, - v["rank"], - pure=False, - workers=[w]) - for w in cb.worker_addresses] - wait(dfs, timeout=5) - assert all([x.result() for x in dfs]) +@pytest.mark.parametrize("func", functions) +@pytest.mark.parametrize("root_location", ["client", "worker", "scheduler"]) +@pytest.mark.nccl +def test_collectives(client, func, root_location): + + try: + cb = Comms( + verbose=True, client=client, nccl_root_location=root_location + ) + cb.init() + + for k, v in cb.worker_info(cb.worker_addresses).items(): + + dfs = [ + client.submit( + func_test_collective, + func, + cb.sessionId, + v["rank"], + pure=False, + workers=[w], + ) + for w in cb.worker_addresses + ] + wait(dfs, timeout=5) + + assert all([x.result() for x in dfs]) + finally: + if cb: + cb.destroy() @pytest.mark.nccl @@ -127,12 +210,12 @@ def test_comm_split(client): cb = Comms(comms_p2p=True, verbose=True) cb.init() - dfs = [client.submit(func_test_comm_split, - cb.sessionId, - 3, - pure=False, - workers=[w]) - for w in cb.worker_addresses] + dfs = [ + client.submit( + func_test_comm_split, cb.sessionId, 3, pure=False, workers=[w] + ) + for w in cb.worker_addresses + ] wait(dfs, timeout=5) @@ -146,13 +229,17 @@ def test_send_recv(n_trials, client): cb = Comms(comms_p2p=True, verbose=True) cb.init() - dfs = [client.submit(func_test_send_recv, - cb.sessionId, - n_trials, - pure=False, - workers=[w]) - for w in cb.worker_addresses] + dfs = [ + client.submit( + func_test_send_recv, + cb.sessionId, + n_trials, + pure=False, + workers=[w], + ) + for w in cb.worker_addresses + ] wait(dfs, timeout=5) - assert(list(map(lambda x: x.result(), dfs))) + assert list(map(lambda x: x.result(), dfs))