Skip to content

Commit

Permalink
Remove last websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 20, 2024
1 parent e7ee737 commit bc2b82d
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 47 deletions.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ dependencies = [
"typing_extensions>=4.5",
"uvloop",
"uvicorn >= 0.17.0",
"websockets",
"xarray",
"xtgeo >= 3.3.0",
"zmq",
Expand Down
22 changes: 1 addition & 21 deletions src/ert/ensemble_evaluator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,7 @@ def _generate_certificate(

class EvaluatorServerConfig:
"""
This class is responsible for identifying a host:port-combo and then provide
low-level sockets bound to said combo. The problem is that these sockets may
be closed by underlying code, while the EvaluatorServerConfig-instance is
still alive and expected to provide a bound low-level socket. Thus we risk
that the host:port is hijacked by another process in the meantime.
To prevent this, we keep a handle to the bound socket and every time
a socket is requested we return a duplicate of this. The duplicate will be
bound similarly to the handle, but when closed the handle stays open and
holds the port.
In particular, the websocket-server closes the websocket when exiting a
context:
https://github.com/aaugustin/websockets/blob/c439f1d52aafc05064cc11702d1c3014046799b0/src/websockets/legacy/server.py#L890
and digging into the cpython-implementation of asyncio, we see that causes
the asyncio code to also close the underlying socket:
https://github.com/python/cpython/blob/b34dd58fee707b8044beaf878962a6fa12b304dc/Lib/asyncio/selector_events.py#L607-L611
TODO zmq setup
"""

def __init__(
Expand Down
9 changes: 4 additions & 5 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,13 @@ async def _initialize_zmq(self) -> None:
self._router_socket: zmq.asyncio.Socket = self._zmq_context.socket(
zmq.ROUTER
)
self._router_socket.curve_secretkey = self._config.server_secret_key
self._router_socket.curve_publickey = self._config.server_public_key
self._router_socket.curve_server = True
if self._config.server_public_key and self._config.server_secret_key:
self._router_socket.curve_secretkey = self._config.server_secret_key
self._router_socket.curve_publickey = self._config.server_public_key
self._router_socket.curve_server = True

# Attempt to bind the ROUTER socket
logger.info(f"Attempting to bind to tcp://*:{self._config.router_port}")
self._router_socket.bind(f"tcp://*:{self._config.router_port}")
logger.info(f"Successfully bound to tcp://*:{self._config.router_port}")

except zmq.error.ZMQError as e:
logger.error(f"ZMQ error during initialization: {e}")
Expand Down
2 changes: 1 addition & 1 deletion src/ert/logging/logger.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ loggers:
level: INFO
subscript:
level: INFO
websockets:
zmq:
level: INFO


Expand Down
10 changes: 3 additions & 7 deletions tests/ert/ui_tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@
import numpy as np
import pandas as pd
import pytest
import websockets.exceptions
import xtgeo
import zmq
from resdata.summary import Summary

import _ert.threading
import ert.shared
from _ert.forward_model_runner.client import Client
from ert import LibresFacade, ensemble_evaluator
from ert.cli.main import ErtCliError
from ert.config import (
ConfigValidationError,
ConfigWarning,
ErtConfig,
)
from ert.config import ConfigValidationError, ConfigWarning, ErtConfig
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EnsembleEvaluator
from ert.mode_definitions import (
Expand Down Expand Up @@ -949,7 +945,7 @@ def test_that_connection_errors_do_not_effect_final_result(
monkeypatch.setattr(Job, "DEFAULT_CHECKSUM_TIMEOUT", 0)

def raise_connection_error(*args, **kwargs):
raise websockets.exceptions.ConnectionClosedError(None, None)
raise zmq.error.ZMQError(None, None)

with patch(
"ert.ensemble_evaluator.evaluator.dispatch_event_from_json",
Expand Down
19 changes: 7 additions & 12 deletions tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import asyncio
import contextlib
import os
from contextlib import asynccontextmanager
from unittest.mock import MagicMock

import pytest
from websockets.exceptions import ConnectionClosed

from _ert.events import EESnapshot, EESnapshotUpdate, EETerminated
from ert.config import QueueConfig
Expand Down Expand Up @@ -93,16 +91,13 @@ async def test_run_and_cancel_legacy_ensemble(
# and the ensemble is set to STOPPED
monitor._receiver_timeout = 10.0
cancel = True
with contextlib.suppress(
ConnectionClosed
): # monitor throws some variant of CC if dispatcher dies
async for event in monitor.track(heartbeat_interval=0.1):
# Cancel the ensemble upon the arrival of the first event
if cancel:
await monitor.signal_cancel()
cancel = False
if type(event) is EETerminated:
terminated_event = True
async for event in monitor.track(heartbeat_interval=0.1):
# Cancel the ensemble upon the arrival of the first event
if cancel:
await monitor.signal_cancel()
cancel = False
if type(event) is EETerminated:
terminated_event = True

if terminated_event:
assert evaluator._ensemble.status == state.ENSEMBLE_STATE_CANCELLED
Expand Down

0 comments on commit bc2b82d

Please sign in to comment.