diff --git a/pyproject.toml b/pyproject.toml index a2bf4072579..074a5a46b72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,7 +73,6 @@ dependencies = [ "typing_extensions>=4.5", "uvloop", "uvicorn >= 0.17.0", - "websockets", "xarray", "xtgeo >= 3.3.0", "zmq", diff --git a/src/ert/ensemble_evaluator/config.py b/src/ert/ensemble_evaluator/config.py index 55c721c5846..c75caf5f0cc 100644 --- a/src/ert/ensemble_evaluator/config.py +++ b/src/ert/ensemble_evaluator/config.py @@ -99,27 +99,7 @@ def _generate_certificate( class EvaluatorServerConfig: """ - This class is responsible for identifying a host:port-combo and then provide - low-level sockets bound to said combo. The problem is that these sockets may - be closed by underlying code, while the EvaluatorServerConfig-instance is - still alive and expected to provide a bound low-level socket. Thus we risk - that the host:port is hijacked by another process in the meantime. - - To prevent this, we keep a handle to the bound socket and every time - a socket is requested we return a duplicate of this. The duplicate will be - bound similarly to the handle, but when closed the handle stays open and - holds the port. - - In particular, the websocket-server closes the websocket when exiting a - context: - - https://github.com/aaugustin/websockets/blob/c439f1d52aafc05064cc11702d1c3014046799b0/src/websockets/legacy/server.py#L890 - - and digging into the cpython-implementation of asyncio, we see that causes - the asyncio code to also close the underlying socket: - - https://github.com/python/cpython/blob/b34dd58fee707b8044beaf878962a6fa12b304dc/Lib/asyncio/selector_events.py#L607-L611 - + TODO zmq setup """ def __init__( diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index 43df3b0cbdb..522ecaadc7d 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -90,14 +90,13 @@ async def _initialize_zmq(self) -> None: self._router_socket: zmq.asyncio.Socket = self._zmq_context.socket( zmq.ROUTER ) - self._router_socket.curve_secretkey = self._config.server_secret_key - self._router_socket.curve_publickey = self._config.server_public_key - self._router_socket.curve_server = True + if self._config.server_public_key and self._config.server_secret_key: + self._router_socket.curve_secretkey = self._config.server_secret_key + self._router_socket.curve_publickey = self._config.server_public_key + self._router_socket.curve_server = True # Attempt to bind the ROUTER socket - logger.info(f"Attempting to bind to tcp://*:{self._config.router_port}") self._router_socket.bind(f"tcp://*:{self._config.router_port}") - logger.info(f"Successfully bound to tcp://*:{self._config.router_port}") except zmq.error.ZMQError as e: logger.error(f"ZMQ error during initialization: {e}") diff --git a/src/ert/logging/logger.conf b/src/ert/logging/logger.conf index 20a5bf589cd..69c90c29d17 100644 --- a/src/ert/logging/logger.conf +++ b/src/ert/logging/logger.conf @@ -33,7 +33,7 @@ loggers: level: INFO subscript: level: INFO - websockets: + zmq: level: INFO diff --git a/tests/ert/ui_tests/cli/test_cli.py b/tests/ert/ui_tests/cli/test_cli.py index ba070c3acf7..6ccb314aebf 100644 --- a/tests/ert/ui_tests/cli/test_cli.py +++ b/tests/ert/ui_tests/cli/test_cli.py @@ -11,8 +11,8 @@ import numpy as np import pandas as pd import pytest -import websockets.exceptions import xtgeo +import zmq from resdata.summary import Summary import _ert.threading @@ -20,11 +20,7 @@ from _ert.forward_model_runner.client import Client from ert import LibresFacade, ensemble_evaluator from ert.cli.main import ErtCliError -from ert.config import ( - ConfigValidationError, - ConfigWarning, - ErtConfig, -) +from ert.config import ConfigValidationError, ConfigWarning, ErtConfig from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EnsembleEvaluator from ert.mode_definitions import ( @@ -949,7 +945,7 @@ def test_that_connection_errors_do_not_effect_final_result( monkeypatch.setattr(Job, "DEFAULT_CHECKSUM_TIMEOUT", 0) def raise_connection_error(*args, **kwargs): - raise websockets.exceptions.ConnectionClosedError(None, None) + raise zmq.error.ZMQError(None, None) with patch( "ert.ensemble_evaluator.evaluator.dispatch_event_from_json", diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py index 5b845a6e3d8..3a8935b26a8 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py @@ -1,11 +1,9 @@ import asyncio -import contextlib import os from contextlib import asynccontextmanager from unittest.mock import MagicMock import pytest -from websockets.exceptions import ConnectionClosed from _ert.events import EESnapshot, EESnapshotUpdate, EETerminated from ert.config import QueueConfig @@ -93,16 +91,13 @@ async def test_run_and_cancel_legacy_ensemble( # and the ensemble is set to STOPPED monitor._receiver_timeout = 10.0 cancel = True - with contextlib.suppress( - ConnectionClosed - ): # monitor throws some variant of CC if dispatcher dies - async for event in monitor.track(heartbeat_interval=0.1): - # Cancel the ensemble upon the arrival of the first event - if cancel: - await monitor.signal_cancel() - cancel = False - if type(event) is EETerminated: - terminated_event = True + async for event in monitor.track(heartbeat_interval=0.1): + # Cancel the ensemble upon the arrival of the first event + if cancel: + await monitor.signal_cancel() + cancel = False + if type(event) is EETerminated: + terminated_event = True if terminated_event: assert evaluator._ensemble.status == state.ENSEMBLE_STATE_CANCELLED