Skip to content

Commit

Permalink
Make find_available ports work with zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 13, 2024
1 parent 6c5b21b commit 932c425
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
15 changes: 9 additions & 6 deletions src/ert/ensemble_evaluator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,22 @@ def __init__(
custom_host: typing.Optional[str] = None,
) -> None:
self._socket_handle = find_available_socket(
custom_range=custom_port_range, custom_host=custom_host
custom_range=custom_port_range,
custom_host=custom_host,
will_close_then_reopen_socket=True,
)
host, port = self._socket_handle.getsockname()
self.host = host
self.pub_sub_port = port

self._socket_handle = find_available_socket(
custom_range=custom_port_range,
custom_host=custom_host,
will_close_then_reopen_socket=True,
)
host, port = self._socket_handle.getsockname()
self.push_pull_port = port

# self.protocol = "wss" if generate_cert else "ws"
# self.url = f"{self.protocol}://{host}:{port}"
# self.client_uri = f"{self.url}/client"
# self.dispatch_uri = f"{self.url}/dispatch"

if generate_cert:
cert, key, pw = _generate_certificate(host)
else:
Expand Down
16 changes: 11 additions & 5 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,22 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig):

async def _initialize_zmq(self) -> None:
self._zmq_context = zmq.asyncio.Context() # type: ignore
self._pull_socket: zmq.asyncio.Socket = self._zmq_context.socket(zmq.PULL)
self._pull_socket.bind(f"tcp://*:{self._config.push_pull_port}")
self._publisher_socket: zmq.asyncio.Socket = self._zmq_context.socket(zmq.PUB)
self._publisher_socket.bind(f"tcp://*:{self._config.pub_sub_port}")
try:
self._pull_socket: zmq.asyncio.Socket = self._zmq_context.socket(zmq.PULL)
self._pull_socket.bind(f"tcp://*:{self._config.push_pull_port}")
self._publisher_socket: zmq.asyncio.Socket = self._zmq_context.socket(
zmq.PUB
)
self._publisher_socket.bind(f"tcp://*:{self._config.pub_sub_port}")
except zmq.error.ZMQError as e:
logger.error(f"ZMQ error: {e}")
raise
logger.error("ZMQ initialized")

async def _publisher(self) -> None:
while True:
event = await self._events_to_send.get()
self._publisher_socket.send_json(event_to_json(event))
await self._publisher_socket.send_string(event_to_json(event))
self._events_to_send.task_done()

async def _append_message(self, snapshot_update_event: EnsembleSnapshot) -> None:
Expand Down
5 changes: 5 additions & 0 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import json
import logging
import ssl
import uuid
Expand Down Expand Up @@ -132,10 +133,14 @@ async def _receiver(self) -> None:

self._push_socket = self._zmq_context.socket(zmq.PUSH)
self._push_socket.connect(self._ee_con_info.push_pull_uri)
self._connected.set()

while True:
try:
raw_msg = await self._listen_socket.recv_string()
raw_msg = json.loads(raw_msg)
event = event_from_json(raw_msg)
print(f"monitor-{self._id} received event: {event}")
await self._event_queue.put(event)
except (zmq.ZMQError, asyncio.CancelledError) as exc:
# Handle disconnection or other ZMQ errors (reconnect or log)
Expand Down
1 change: 1 addition & 0 deletions src/ert/shared/net_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def _bind_socket(

if will_close_then_reopen_socket:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 0)

Expand Down

0 comments on commit 932c425

Please sign in to comment.