diff --git a/ucp/core.py b/ucp/core.py index 6f5ddf3c0..8da1ef4e6 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -35,7 +35,7 @@ def _get_ctx(): return _ctx -async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): +async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener, stream_timeout=5.0): """Help function that exchange endpoint information""" # Pack peer information incl. a checksum @@ -48,11 +48,23 @@ async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): # Send/recv peer information. Notice, we force an `await` between the two # streaming calls (see ) if listener is True: - await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) - await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) + await asyncio.wait_for( + comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes), + timeout=stream_timeout, + ) + await asyncio.wait_for( + comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes), + timeout=stream_timeout, + ) else: - await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) - await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) + await asyncio.wait_for( + comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes), + timeout=stream_timeout, + ) + await asyncio.wait_for( + comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes), + timeout=stream_timeout, + ) # Unpacking and sanity check of the peer information ret = {}