diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py b/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py index 04843a5df29..c3b58a6be3c 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_monitor.py @@ -26,6 +26,36 @@ async def async_zmq_server(port, handler): zmq_context.destroy() +async def test_monitor_connects_and_disconnects_successfully(unused_tcp_port): + ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}") + monitor = Monitor(ee_con_info) + + messages = [] + + async def mock_event_handler(router_socket): + nonlocal messages + while True: + dealer, _, frame = await router_socket.recv_multipart() + await router_socket.send_multipart([dealer, b"", ACK_MSG]) + frame = frame.decode("utf-8") + messages.append((dealer.decode("utf-8"), frame)) + if frame == DISCONNECT_MSG: + break + + websocket_server_task = asyncio.create_task( + async_zmq_server(unused_tcp_port, mock_event_handler) + ) + async with monitor: + pass + await websocket_server_task + dealer, msg = messages[0] + assert dealer.startswith("client-") + assert msg == CONNECT_MSG + dealer, msg = messages[1] + assert dealer.startswith("client-") + assert msg == DISCONNECT_MSG + + async def test_no_connection_established(make_ee_config): ee_config = make_ee_config() monitor = Monitor(ee_config.get_connection_info()) @@ -43,21 +73,19 @@ async def test_immediate_stop(unused_tcp_port): async def mock_event_handler(router_socket): nonlocal connected while True: - dealer, _, *frames = await router_socket.recv_multipart() + dealer, _, frame = await router_socket.recv_multipart() await router_socket.send_multipart([dealer, b"", ACK_MSG]) dealer = dealer.decode("utf-8") - for frame in frames: - frame = frame.decode("utf-8") - assert dealer.startswith("client-") - if frame == CONNECT_MSG: - connected = True - elif frame == DISCONNECT_MSG: - connected = False - return - else: - event = event_from_json(frame) - assert connected - assert type(event) is EEUserDone + frame = frame.decode("utf-8") + if frame == CONNECT_MSG: + connected = True + elif frame == DISCONNECT_MSG: + connected = False + return + else: + event = event_from_json(frame) + assert connected + assert type(event) is EEUserDone websocket_server_task = asyncio.create_task( async_zmq_server(unused_tcp_port, mock_event_handler) @@ -107,21 +135,19 @@ async def test_that_monitor_track_can_exit_without_terminated_event_from_evaluat async def mock_event_handler(router_socket): nonlocal connected while True: - dealer, _, *frames = await router_socket.recv_multipart() + dealer, _, frame = await router_socket.recv_multipart() await router_socket.send_multipart([dealer, b"", ACK_MSG]) dealer = dealer.decode("utf-8") - for frame in frames: - frame = frame.decode("utf-8") - assert dealer.startswith("client-") - if frame == CONNECT_MSG: - connected = True - elif frame == DISCONNECT_MSG: - connected = False - return - else: - event = event_from_json(frame) - assert connected - assert type(event) is EEUserCancel + frame = frame.decode("utf-8") + if frame == CONNECT_MSG: + connected = True + elif frame == DISCONNECT_MSG: + connected = False + return + else: + event = event_from_json(frame) + assert connected + assert type(event) is EEUserCancel websocket_server_task = asyncio.create_task( async_zmq_server(unused_tcp_port, mock_event_handler)