Skip to content

Commit

Permalink
Add a timeout to exchange_peer_info
Browse files Browse the repository at this point in the history
After some debugging of Distributed tests with UCX it was observed that
sometimes `exchange_peer_info` hangs indefinitely, specifically when
executing `stream_recv` on the client side. The causes for this is
unknown but believed to be due to messages being lost if there's either
multiple stream messages being transferred simultaneously among various
endpoints or being lost due to the receiving end taking too long to
launch `stream_recv`, see #509
for a similar issue related to stream API. By adding a timeout doesn't
allow recovery, but at least allows a UCX-Py client to retry upon
failure to establish the endpoint.
  • Loading branch information
pentschev committed Oct 11, 2023
1 parent 69d7ee1 commit a22baf7
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions ucp/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _get_ctx():
return _ctx


async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener):
async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener, stream_timeout=5.0):
"""Help function that exchange endpoint information"""

# Pack peer information incl. a checksum
Expand All @@ -48,11 +48,23 @@ async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener):
# Send/recv peer information. Notice, we force an `await` between the two
# streaming calls (see <https://github.com/rapidsai/ucx-py/pull/509>)
if listener is True:
await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)
await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
await asyncio.wait_for(
comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes),
timeout=stream_timeout,
)
await asyncio.wait_for(
comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes),
timeout=stream_timeout,
)
else:
await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)
await asyncio.wait_for(
comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes),
timeout=stream_timeout,
)
await asyncio.wait_for(
comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes),
timeout=stream_timeout,
)

# Unpacking and sanity check of the peer information
ret = {}
Expand Down

0 comments on commit a22baf7

Please sign in to comment.