Skip to content

Commit

Permalink
Timeout for exchange_peer_info and fix for AM tests (#994)
Browse files Browse the repository at this point in the history
After some debugging of Distributed tests with UCX it was observed that sometimes `exchange_peer_info` hangs indefinitely, specifically when executing `stream_recv` on the client side. The causes for this is unknown but believed to be due to messages being lost if there's either multiple stream messages being transferred simultaneously among various endpoints or being lost due to the receiving end taking too long to launch `stream_recv`, see #509 for a similar issue related to stream API. By adding a timeout doesn't allow recovery, but at least allows a UCX-Py client to retry upon failure to establish the endpoint.

This change seems to resolve dask/distributed#5229, at least it isn't reproducible locally with this change.

Additionally do a roundtrip message transfer for `test_send_recv_am, which should resolve #797 and seems to be caused by checking received messages too early, before they are actually received by the listener. A roundtrip ensures the client receives the reply and thus prevents us from the checking for a transfer that didn't complete yet.

Ensure now also that the listener is closed before validating `test_close_callback` conditions, which was also flaky.

Finally, ensure we close the loop in test fixture, thus preventing `DeprecationWarning`s from pytest-asyncio
which currently closes unclosed event loop but will stop doing that in future releases.

Closes #797

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Ray Douglass (https://github.com/raydouglass)
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #994
  • Loading branch information
pentschev authored Oct 11, 2023
1 parent 69d7ee1 commit 438e5a2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
3 changes: 1 addition & 2 deletions ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ python -m pip install $(echo ./dist/ucx_py*.whl)[test]
cd tests
python -m pytest --cache-clear -vs .
cd ../ucp
# skipped test context: https://github.com/rapidsai/ucx-py/issues/797
python -m pytest -k 'not test_send_recv_am' --cache-clear -vs ./_libs/tests/
python -m pytest --cache-clear -vs ./_libs/tests/
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ def event_loop(scope="session"):
yield loop
ucp.reset()
loop.run_until_complete(asyncio.sleep(0))
loop.close()
5 changes: 5 additions & 0 deletions tests/test_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

import pytest

import ucp
Expand All @@ -16,6 +18,7 @@ async def server_node(ep):
ep.set_close_callback(_close_callback)
if server_close_callback is False:
await ep.close()
listener.close()

async def client_node(port):
ep = await ucp.create_endpoint(
Expand All @@ -31,6 +34,8 @@ async def client_node(port):
server_node,
)
await client_node(listener.port)
while not listener.closed():
await asyncio.sleep(0.01)
assert closed[0] is True


Expand Down
9 changes: 5 additions & 4 deletions tests/test_send_recv_am.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ def get_data():

def simple_server(size, recv):
async def server(ep):
recv.append(await ep.am_recv())
recv = await ep.am_recv()
await ep.am_send(recv)
await ep.close()

return server


@pytest.mark.flaky(reruns=3, reruns_delay=1)
@pytest.mark.asyncio
@pytest.mark.parametrize("size", msg_sizes)
@pytest.mark.parametrize("blocking_progress_mode", [True, False])
Expand Down Expand Up @@ -88,13 +88,14 @@ async def test_send_recv_am(size, blocking_progress_mode, recv_wait, data):
# immediately as receive data is already available.
await asyncio.sleep(1)
await c.am_send(msg)
recv_msg = await c.am_recv()
for c in clients:
await c.close()
listener.close()

if data["memory_type"] == "cuda" and msg.nbytes < rndv_thresh:
# Eager messages are always received on the host, if no host
# allocator is registered UCX-Py defaults to `bytearray`.
assert recv[0] == bytearray(msg.get())
assert recv_msg == bytearray(msg.get())
else:
data["validator"](recv[0], msg)
data["validator"](recv_msg, msg)
22 changes: 17 additions & 5 deletions ucp/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _get_ctx():
return _ctx


async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener):
async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener, stream_timeout=5.0):
"""Help function that exchange endpoint information"""

# Pack peer information incl. a checksum
Expand All @@ -48,11 +48,23 @@ async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener):
# Send/recv peer information. Notice, we force an `await` between the two
# streaming calls (see <https://github.com/rapidsai/ucx-py/pull/509>)
if listener is True:
await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)
await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
await asyncio.wait_for(
comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes),
timeout=stream_timeout,
)
await asyncio.wait_for(
comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes),
timeout=stream_timeout,
)
else:
await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes)
await asyncio.wait_for(
comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes),
timeout=stream_timeout,
)
await asyncio.wait_for(
comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes),
timeout=stream_timeout,
)

# Unpacking and sanity check of the peer information
ret = {}
Expand Down

0 comments on commit 438e5a2

Please sign in to comment.