Skip to content

Commit

Permalink
Update to connection
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Dec 1, 2024
1 parent be0e25c commit 3377024
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
35 changes: 27 additions & 8 deletions src/_ert/forward_model_runner/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def term(self):

def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None:
self.send("DISCONNECT")
self.socket.disconnect(self.url)
self.term()

async def __aenter__(self) -> Self:
Expand All @@ -49,6 +50,7 @@ async def __aexit__(
self, exc_type: Any, exc_value: Any, exc_traceback: Any
) -> None:
await self._send("DISCONNECT")
self.socket.disconnect(self.url)
self.term()

def __init__(
Expand Down Expand Up @@ -84,17 +86,34 @@ def __init__(
self._max_retries = max_retries
self.loop = new_event_loop()

# async def reconnect(self) -> None:
# self._connected = False
# self.socket.connect(self.url)
# try:
# # await self._send("CONNECT")
# connect_msg = await self.socket.recv()
# if connect_msg.decode() == "CONNECT":
# self._connected = True
# else:
# raise ClientConnectionError("No connect from the evaluator")
# except ClientConnectionError as exc:
# logger.warning("Failed to get acknowledgment on the connect!")
# raise ClientConnectionError(
# "Connection to evaluator not established!"
# ) from exc

async def reconnect(self) -> None:
self._connected = False
self.socket.connect(self.url)
await self.socket.send_multipart([b"", b"CONNECT"])
try:
await self._send("CONNECT")
self._connected = True
except ClientConnectionError as exc:
logger.warning("Failed to get acknowledgment on the connect!")
raise ClientConnectionError(
"Connection to evaluator not established!"
) from exc
_, ack = await asyncio.wait_for(
self.socket.recv_multipart(), timeout=self._connection_timeout
)
if ack.decode() != "ACK":
raise asyncio.TimeoutError("No Ack for connect")
except asyncio.TimeoutError:
logger.warning("Failed to get acknowledgment on dealer connect!")
raise

def send(self, messages: str | list[str]) -> None:
self.loop.run_until_complete(self._send(messages))
Expand Down
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async def __aenter__(self) -> "Monitor":
async def _term(self) -> None:
if self._receiver_task:
await self._socket.send_multipart([b"", b"DISCONNECT"])
self._socket.disconnect(self._ee_con_info.router_uri)
if not self._receiver_task.done():
self._receiver_task.cancel()
await asyncio.gather(
Expand Down

0 comments on commit 3377024

Please sign in to comment.