From 438e5a217dde1e3f530f9d3736074bf7cc439b02 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 11 Oct 2023 20:16:43 +0200 Subject: [PATCH] Timeout for `exchange_peer_info` and fix for AM tests (#994) After some debugging of Distributed tests with UCX it was observed that sometimes `exchange_peer_info` hangs indefinitely, specifically when executing `stream_recv` on the client side. The causes for this is unknown but believed to be due to messages being lost if there's either multiple stream messages being transferred simultaneously among various endpoints or being lost due to the receiving end taking too long to launch `stream_recv`, see https://github.com/rapidsai/ucx-py/pull/509 for a similar issue related to stream API. By adding a timeout doesn't allow recovery, but at least allows a UCX-Py client to retry upon failure to establish the endpoint. This change seems to resolve https://github.com/dask/distributed/issues/5229, at least it isn't reproducible locally with this change. Additionally do a roundtrip message transfer for `test_send_recv_am, which should resolve https://github.com/rapidsai/ucx-py/issues/797 and seems to be caused by checking received messages too early, before they are actually received by the listener. A roundtrip ensures the client receives the reply and thus prevents us from the checking for a transfer that didn't complete yet. Ensure now also that the listener is closed before validating `test_close_callback` conditions, which was also flaky. Finally, ensure we close the loop in test fixture, thus preventing `DeprecationWarning`s from pytest-asyncio which currently closes unclosed event loop but will stop doing that in future releases. Closes #797 Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Ray Douglass (https://github.com/raydouglass) - Charles Blackmon-Luca (https://github.com/charlesbluca) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/ucx-py/pull/994 --- ci/test_wheel.sh | 3 +-- tests/conftest.py | 1 + tests/test_endpoint.py | 5 +++++ tests/test_send_recv_am.py | 9 +++++---- ucp/core.py | 22 +++++++++++++++++----- 5 files changed, 29 insertions(+), 11 deletions(-) diff --git a/ci/test_wheel.sh b/ci/test_wheel.sh index 6ec229fb3..513bdb71d 100755 --- a/ci/test_wheel.sh +++ b/ci/test_wheel.sh @@ -13,5 +13,4 @@ python -m pip install $(echo ./dist/ucx_py*.whl)[test] cd tests python -m pytest --cache-clear -vs . cd ../ucp -# skipped test context: https://github.com/rapidsai/ucx-py/issues/797 -python -m pytest -k 'not test_send_recv_am' --cache-clear -vs ./_libs/tests/ +python -m pytest --cache-clear -vs ./_libs/tests/ diff --git a/tests/conftest.py b/tests/conftest.py index 27896d968..2b7621ea5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,3 +48,4 @@ def event_loop(scope="session"): yield loop ucp.reset() loop.run_until_complete(asyncio.sleep(0)) + loop.close() diff --git a/tests/test_endpoint.py b/tests/test_endpoint.py index 173fe8e18..aa95f5596 100644 --- a/tests/test_endpoint.py +++ b/tests/test_endpoint.py @@ -1,3 +1,5 @@ +import asyncio + import pytest import ucp @@ -16,6 +18,7 @@ async def server_node(ep): ep.set_close_callback(_close_callback) if server_close_callback is False: await ep.close() + listener.close() async def client_node(port): ep = await ucp.create_endpoint( @@ -31,6 +34,8 @@ async def client_node(port): server_node, ) await client_node(listener.port) + while not listener.closed(): + await asyncio.sleep(0.01) assert closed[0] is True diff --git a/tests/test_send_recv_am.py b/tests/test_send_recv_am.py index 15bc32f3a..d9aabaa8a 100644 --- a/tests/test_send_recv_am.py +++ b/tests/test_send_recv_am.py @@ -52,13 +52,13 @@ def get_data(): def simple_server(size, recv): async def server(ep): - recv.append(await ep.am_recv()) + recv = await ep.am_recv() + await ep.am_send(recv) await ep.close() return server -@pytest.mark.flaky(reruns=3, reruns_delay=1) @pytest.mark.asyncio @pytest.mark.parametrize("size", msg_sizes) @pytest.mark.parametrize("blocking_progress_mode", [True, False]) @@ -88,6 +88,7 @@ async def test_send_recv_am(size, blocking_progress_mode, recv_wait, data): # immediately as receive data is already available. await asyncio.sleep(1) await c.am_send(msg) + recv_msg = await c.am_recv() for c in clients: await c.close() listener.close() @@ -95,6 +96,6 @@ async def test_send_recv_am(size, blocking_progress_mode, recv_wait, data): if data["memory_type"] == "cuda" and msg.nbytes < rndv_thresh: # Eager messages are always received on the host, if no host # allocator is registered UCX-Py defaults to `bytearray`. - assert recv[0] == bytearray(msg.get()) + assert recv_msg == bytearray(msg.get()) else: - data["validator"](recv[0], msg) + data["validator"](recv_msg, msg) diff --git a/ucp/core.py b/ucp/core.py index 6f5ddf3c0..8da1ef4e6 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -35,7 +35,7 @@ def _get_ctx(): return _ctx -async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): +async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener, stream_timeout=5.0): """Help function that exchange endpoint information""" # Pack peer information incl. a checksum @@ -48,11 +48,23 @@ async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): # Send/recv peer information. Notice, we force an `await` between the two # streaming calls (see ) if listener is True: - await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) - await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) + await asyncio.wait_for( + comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes), + timeout=stream_timeout, + ) + await asyncio.wait_for( + comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes), + timeout=stream_timeout, + ) else: - await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) - await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) + await asyncio.wait_for( + comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes), + timeout=stream_timeout, + ) + await asyncio.wait_for( + comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes), + timeout=stream_timeout, + ) # Unpacking and sanity check of the peer information ret = {}