Skip to content

Commit

Permalink
Dealers are bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 18, 2024
1 parent c180880 commit 25b8079
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/_ert/forward_model_runner/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def send(self, messages: str | list[str]) -> None:
)
try:
_, ack = self.socket.recv_multipart()
logger.debug(f"Got acknowledgment: {ack}")
if ack.decode() == "ACK":
break
logger.warning(
Expand Down
4 changes: 2 additions & 2 deletions src/ert/ensemble_evaluator/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ async def evaluate(
ce_unary_send_method_name,
partialmethod(
self.__class__.send_event,
self._config.get_connection_info().push_pull_uri,
self._config.get_connection_info().router_uri,
token=self._config.token,
cert=self._config.cert,
),
Expand Down Expand Up @@ -277,7 +277,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
max_running=self._queue_config.max_running,
submit_sleep=self._queue_config.submit_sleep,
ens_id=self.id_,
ee_uri=self._config.get_connection_info().push_pull_uri,
ee_uri=self._config.get_connection_info().router_uri,
ee_cert=self._config.cert,
ee_token=self._config.token,
)
Expand Down
26 changes: 19 additions & 7 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig):
self._batching_interval: int = 2
self._complete_batch: asyncio.Event = asyncio.Event()
self._zmq_context: zmq.asyncio.Context | None = None
self._clients_connected: set[str] = set()
self._clients_connected: set[bytes] = set()
self._dispatchers_connected: asyncio.Queue[None] = asyncio.Queue()

async def _initialize_zmq(self) -> None:
Expand All @@ -92,6 +92,18 @@ async def _initialize_zmq(self) -> None:
self._router_socket.curve_server = True
self._router_socket.bind(f"tcp://*:{self._config.router_port}")

# Monitor the socket
self._router_socket.monitor("inproc://router-monitor", zmq.EVENT_ALL)
monitor_socket = self._zmq_context.socket(zmq.PAIR)
monitor_socket.connect("inproc://router-monitor")

# Check for binding event
if monitor_socket.poll(5000): # Wait for up to 5 seconds
event = await monitor_socket.recv_multipart()
print(f"Monitor Event: {event}")
else:
print("No monitor event within 5 seconds. Something may be wrong.")

except zmq.error.ZMQError as e:
logger.error(f"ZMQ error: {e}")
raise
Expand Down Expand Up @@ -217,15 +229,16 @@ def ensemble(self) -> Ensemble:
async def listen_for_messages(self) -> None:
while True:
try:
sender, _, *frames = await self._router_socket.recv_multipart()
sender = sender.decode("utf-8")
dealer, _, *frames = await self._router_socket.recv_multipart()
sender = dealer.decode("utf-8")
print(f"Got message from {sender} {frames}")
if sender.startswith("client"):
for frame in frames:
raw_msg = frame.decode("utf-8")
if raw_msg == "CONNECT":
self._clients_connected.add(sender)
self._clients_connected.add(dealer)
elif raw_msg == "DISCONNECT":
self._clients_connected.remove(sender)
self._clients_connected.remove(dealer)
else:
event = event_from_json(raw_msg)
if type(event) is EEUserCancel:
Expand All @@ -235,7 +248,7 @@ async def listen_for_messages(self) -> None:
logger.debug("Client signalled done.")
self.stop()
elif sender.startswith("dispatch"):
await self._router_socket.send_multipart([sender, b"", b"ACK"])
await self._router_socket.send_multipart([dealer, b"", b"ACK"])
for frame in frames:
raw_msg = frame.decode("utf-8")
event = dispatch_event_from_json(raw_msg)
Expand Down Expand Up @@ -277,7 +290,6 @@ async def _server(self) -> None:
event = EETerminated(ensemble=self._ensemble.id_)
await self._events_to_send.put(event)
await self._events_to_send.join()
await self._clients_connected.join()
self._router_socket.close()
logger.debug("Async server exiting.")

Expand Down

0 comments on commit 25b8079

Please sign in to comment.