Skip to content

Commit

Permalink
Python comms to hold onto server endpoints (rapidsai#241)
Browse files Browse the repository at this point in the history
The barrier synchronous communication pattern of the RAFT comms allows the senders and receivers to know ahead of time when a message needs to be initiated. Because of this, we only need to place the rank of the sender in 32-bits of the tag for the receiver and we only use UCX endpoints for sending messages while the receiver uses the `ucx_worker` to receive. This is a little different than the fully asynchronous pattern of ucx-py, where an endpoint is created in the connection listener and its reference held in order to send messages asynchronously at a later time. Still, this endpoint's life cycle is also expected to be managed by the user. 

We are still not entirely sure why this additional endpoint causes issues under some circumstances and not others - for example, we might never encounter an issue with one configuration, while another configuration may fail every single time (such as a timeout, lockup, or explicit error). 

@pentschev and I tested this change on his configuration on UCX 1.11 w/ the latest dask/distributed and it appears to fix the hang. I have also tested that it runs on UCX 1.9 successfully. In my tests, I run an MNMG nearest neighbors on 50k rows. Below are the configuration options we used:

UCX 1.9
```
export DASK_UCX__CUDA_COPY=True
export DASK_UCX__TCP=True
export DASK_UCX__NVLINK=True
export DASK_UCX__INFINIBAND=True
export DASK_UCX__RDMACM=False
export DASK_RMM__POOL_SIZE=0.5GB
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s"
export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False"

export DASK_UCX__REUSE_ENDPOINTS=True
export UCXPY_IFNAME="ib0"
export UCX_NET_DEVICES=all
export UCX_MAX_RNDV_RAILS=1  # <-- must be set in the client env too!
export DASK_LOGGING__DISTRIBUTED="DEBUG"
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7

SCHEDULER_FILE=${SHARED_DIR}/dask-scheduler.json

SCHEDULER_ARGS="--protocol ucx  --port 8792
                --interface ib0
                --scheduler-file $SCHEDULER_FILE"

WORKER_ARGS="--enable-tcp-over-ucx
             --enable-nvlink 
             --enable-infiniband
             --rmm-pool-size=1G 
             --net-devices="ib0"
             --local-directory /tmp/$LOGNAME 
             --scheduler-file $SCHEDULER_FILE"
```

UCX 1.11
```
export DASK_UCX__CUDA_COPY=True
export DASK_UCX__TCP=True
export DASK_UCX__NVLINK=True
export DASK_UCX__INFINIBAND=True
export DASK_UCX__RDMACM=True
export DASK_RMM__POOL_SIZE=0.5GB
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s"
export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False"

export DASK_UCX__REUSE_ENDPOINTS=True
export UCXPY_IFNAME="ib0"
export UCX_MAX_RNDV_RAILS=1  # <-- must be set in the client env too!
export DASK_LOGGING__DISTRIBUTED="DEBUG"
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7

SCHEDULER_FILE=${SHARED_DIR}/dask-scheduler.json

SCHEDULER_ARGS="--protocol ucx  --port 8792
                --interface ib0
                --scheduler-file $SCHEDULER_FILE"

WORKER_ARGS="--enable-tcp-over-ucx
             --enable-nvlink 
             --enable-infiniband
             --enable-rdmacm
             --rmm-pool-size=1G 
             --local-directory /tmp/$LOGNAME 
             --scheduler-file $SCHEDULER_FILE"
```


And for the client:

UCX 1.9
```
initialize(enable_tcp_over_ucx=True,
           enable_nvlink=True,
           enable_infiniband=True,
           enable_rdmacm=False,
          )
```

UCX 1.11
```
initialize(enable_tcp_over_ucx=True,
           enable_nvlink=True,
           enable_infiniband=True,
           enable_rdmacm=True,
          )
```

Also tagging @rlratzel

Authors:
  - Corey J. Nolet (https://github.com/cjnolet)

Approvers:
  - Divye Gala (https://github.com/divyegala)

URL: rapidsai#241
  • Loading branch information
cjnolet authored May 29, 2021
1 parent d576348 commit 00c0401
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion python/raft/dask/common/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


async def _connection_func(ep):
return 0
UCX.get().add_server_endpoint(ep)


class UCX:
Expand All @@ -35,6 +35,7 @@ def __init__(self, listener_callback):

self._create_listener()
self._endpoints = {}
self._server_endpoints = []

assert UCX.__instance is None

Expand All @@ -60,6 +61,9 @@ async def _create_endpoint(self, ip, port):
self._endpoints[(ip, port)] = ep
return ep

def add_server_endpoint(self, ep):
self._server_endpoints.append(ep)

async def get_endpoint(self, ip, port):
if (ip, port) not in self._endpoints:
ep = await self._create_endpoint(ip, port)
Expand All @@ -72,10 +76,18 @@ async def close_endpoints(self):
for k, ep in self._endpoints.items():
await ep.close()

for ep in self._server_endpoints:
ep.close()

def __del__(self):
for ip_port, ep in self._endpoints.items():
if not ep.closed():
ep.abort()
del ep

for ep in self._server_endpoints:
if not ep.closed():
ep.abort()
del ep

self._listener.close()

0 comments on commit 00c0401

Please sign in to comment.