diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index e935da7f..bbf5c856 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -33,7 +33,7 @@ export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 # Install dask and distributed from master branch. Usually needed during # development time and disabled before a new dask-cuda release. -export INSTALL_DASK_MASTER=0 +export INSTALL_DASK_MASTER=1 ################################################################################ # SETUP - Check environment diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 05f0b515..ecabafe4 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -241,7 +241,7 @@ def del_pid_file(): name=name if nprocs == 1 or not name else str(name) + "-" + str(i), local_directory=local_directory, config={ - "ucx": get_ucx_config( + "distributed.comm.ucx": get_ucx_config( enable_tcp_over_ucx=enable_tcp_over_ucx, enable_infiniband=enable_infiniband, enable_nvlink=enable_nvlink, diff --git a/dask_cuda/initialize.py b/dask_cuda/initialize.py index 416a7d6e..b312652f 100644 --- a/dask_cuda/initialize.py +++ b/dask_cuda/initialize.py @@ -92,7 +92,7 @@ def initialize( net_devices=net_devices, cuda_device_index=cuda_device_index, ) - dask.config.update(dask.config.global_config, {"ucx": ucx_config}, priority="new") + dask.config.set({"distributed.comm.ucx": ucx_config}) @click.command() diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 26831f60..d6fb10fc 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -339,7 +339,7 @@ def __init__( protocol=protocol, worker_class=worker_class, config={ - "ucx": get_ucx_config( + "distributed.comm.ucx": get_ucx_config( enable_tcp_over_ucx=enable_tcp_over_ucx, enable_nvlink=enable_nvlink, enable_infiniband=enable_infiniband, diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index cb99de1b..f26351e4 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -29,7 +29,7 @@ def _test_initialize_ucx_tcp(): n_workers=1, threads_per_worker=1, processes=True, - config={"ucx": get_ucx_config(**kwargs)}, + config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: res = da.from_array(numpy.arange(10000), chunks=(1000,)) @@ -68,7 +68,7 @@ def _test_initialize_ucx_nvlink(): n_workers=1, threads_per_worker=1, processes=True, - config={"ucx": get_ucx_config(**kwargs)}, + config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: res = da.from_array(numpy.arange(10000), chunks=(1000,)) @@ -110,7 +110,7 @@ def _test_initialize_ucx_infiniband(): n_workers=1, threads_per_worker=1, processes=True, - config={"ucx": get_ucx_config(**kwargs)}, + config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: res = da.from_array(numpy.arange(10000), chunks=(1000,))