From 00c040184794b41ce36a3efe99e8277f884637ec Mon Sep 17 00:00:00 2001 From: "Corey J. Nolet" Date: Fri, 28 May 2021 20:07:06 -0400 Subject: [PATCH] Python comms to hold onto server endpoints (#241) The barrier synchronous communication pattern of the RAFT comms allows the senders and receivers to know ahead of time when a message needs to be initiated. Because of this, we only need to place the rank of the sender in 32-bits of the tag for the receiver and we only use UCX endpoints for sending messages while the receiver uses the `ucx_worker` to receive. This is a little different than the fully asynchronous pattern of ucx-py, where an endpoint is created in the connection listener and its reference held in order to send messages asynchronously at a later time. Still, this endpoint's life cycle is also expected to be managed by the user. We are still not entirely sure why this additional endpoint causes issues under some circumstances and not others - for example, we might never encounter an issue with one configuration, while another configuration may fail every single time (such as a timeout, lockup, or explicit error). @pentschev and I tested this change on his configuration on UCX 1.11 w/ the latest dask/distributed and it appears to fix the hang. I have also tested that it runs on UCX 1.9 successfully. In my tests, I run an MNMG nearest neighbors on 50k rows. Below are the configuration options we used: UCX 1.9 ``` export DASK_UCX__CUDA_COPY=True export DASK_UCX__TCP=True export DASK_UCX__NVLINK=True export DASK_UCX__INFINIBAND=True export DASK_UCX__RDMACM=False export DASK_RMM__POOL_SIZE=0.5GB export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s" export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s" export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s" export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s" export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False" export DASK_UCX__REUSE_ENDPOINTS=True export UCXPY_IFNAME="ib0" export UCX_NET_DEVICES=all export UCX_MAX_RNDV_RAILS=1 # <-- must be set in the client env too! export DASK_LOGGING__DISTRIBUTED="DEBUG" export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 SCHEDULER_FILE=${SHARED_DIR}/dask-scheduler.json SCHEDULER_ARGS="--protocol ucx --port 8792 --interface ib0 --scheduler-file $SCHEDULER_FILE" WORKER_ARGS="--enable-tcp-over-ucx --enable-nvlink --enable-infiniband --rmm-pool-size=1G --net-devices="ib0" --local-directory /tmp/$LOGNAME --scheduler-file $SCHEDULER_FILE" ``` UCX 1.11 ``` export DASK_UCX__CUDA_COPY=True export DASK_UCX__TCP=True export DASK_UCX__NVLINK=True export DASK_UCX__INFINIBAND=True export DASK_UCX__RDMACM=True export DASK_RMM__POOL_SIZE=0.5GB export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s" export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s" export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s" export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s" export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False" export DASK_UCX__REUSE_ENDPOINTS=True export UCXPY_IFNAME="ib0" export UCX_MAX_RNDV_RAILS=1 # <-- must be set in the client env too! export DASK_LOGGING__DISTRIBUTED="DEBUG" export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 SCHEDULER_FILE=${SHARED_DIR}/dask-scheduler.json SCHEDULER_ARGS="--protocol ucx --port 8792 --interface ib0 --scheduler-file $SCHEDULER_FILE" WORKER_ARGS="--enable-tcp-over-ucx --enable-nvlink --enable-infiniband --enable-rdmacm --rmm-pool-size=1G --local-directory /tmp/$LOGNAME --scheduler-file $SCHEDULER_FILE" ``` And for the client: UCX 1.9 ``` initialize(enable_tcp_over_ucx=True, enable_nvlink=True, enable_infiniband=True, enable_rdmacm=False, ) ``` UCX 1.11 ``` initialize(enable_tcp_over_ucx=True, enable_nvlink=True, enable_infiniband=True, enable_rdmacm=True, ) ``` Also tagging @rlratzel Authors: - Corey J. Nolet (https://github.com/cjnolet) Approvers: - Divye Gala (https://github.com/divyegala) URL: https://github.com/rapidsai/raft/pull/241 --- python/raft/dask/common/ucx.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/raft/dask/common/ucx.py b/python/raft/dask/common/ucx.py index 7e44f3bc43..f61479a0eb 100644 --- a/python/raft/dask/common/ucx.py +++ b/python/raft/dask/common/ucx.py @@ -17,7 +17,7 @@ async def _connection_func(ep): - return 0 + UCX.get().add_server_endpoint(ep) class UCX: @@ -35,6 +35,7 @@ def __init__(self, listener_callback): self._create_listener() self._endpoints = {} + self._server_endpoints = [] assert UCX.__instance is None @@ -60,6 +61,9 @@ async def _create_endpoint(self, ip, port): self._endpoints[(ip, port)] = ep return ep + def add_server_endpoint(self, ep): + self._server_endpoints.append(ep) + async def get_endpoint(self, ip, port): if (ip, port) not in self._endpoints: ep = await self._create_endpoint(ip, port) @@ -72,10 +76,18 @@ async def close_endpoints(self): for k, ep in self._endpoints.items(): await ep.close() + for ep in self._server_endpoints: + ep.close() + def __del__(self): for ip_port, ep in self._endpoints.items(): if not ep.closed(): ep.abort() del ep + for ep in self._server_endpoints: + if not ep.closed(): + ep.abort() + del ep + self._listener.close()