Skip to content

Commit

Permalink
WIP: test ipc protocol for local driver
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Dec 9, 2024
1 parent c8537dc commit 5d881ac
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
35 changes: 18 additions & 17 deletions src/ert/ensemble_evaluator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ssl
import tempfile
import typing
import uuid
import warnings
from base64 import b64encode
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -108,30 +109,30 @@ def __init__(
use_token: bool = True,
generate_cert: bool = True,
custom_host: typing.Optional[str] = None,
localhost: bool = True,
) -> None:
self._socket_handle = find_available_socket(
custom_range=custom_port_range,
custom_host=custom_host,
will_close_then_reopen_socket=True,
)
host, port = self._socket_handle.getsockname()
self.host = host
self.router_port = port
self.url = f"tcp://{self.host}:{self.router_port}"
self.host: typing.Optional[str] = None
self.router_port: typing.Optional[int] = None
self.url = f"ipc:///tmp/socket-{uuid.uuid4().hex[:8]}"
self.token: typing.Optional[str] = None

self.server_public_key: typing.Optional[bytes] = None
self.server_secret_key: typing.Optional[bytes] = None

if generate_cert:
cert, key, pw = _generate_certificate(host)
if not localhost:
self._socket_handle = find_available_socket(
custom_range=custom_port_range,
custom_host=custom_host,
will_close_then_reopen_socket=True,
)
self.host, self.router_port = self._socket_handle.getsockname()
self.url = f"tcp://{self.host}:{self.router_port}"

if use_token:
self.server_public_key, self.server_secret_key = zmq.curve_keypair()
self.token = self.server_public_key.decode("utf-8")
else:
cert, key, pw = None, None, None
self.cert = cert
self._key: Optional[bytes] = key
self._key_pw = pw
self.cert: typing.Optional[str] = None
self._key: Optional[bytes] = None
self._key_pw: Optional[bytes] = None

def get_socket(self) -> socket.socket:
return self._socket_handle.dup()
Expand Down
11 changes: 6 additions & 5 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,13 @@ async def _server(self) -> None:
self._router_socket.curve_server = True

# Attempt to bind the ROUTER socket
self._router_socket.bind(f"tcp://*:{self._config.router_port}")
# self._router_socket.bind(f"tcp://*:{self._config.router_port}")
if self._config.router_port:
self._router_socket.bind(f"tcp://*:{self._config.router_port}")
else:
self._router_socket.bind(self._config.url)
self._server_started.set()
print(
"ROUTER listens on",
f"tcp://*:{self._config.router_port} {self._config.server_public_key=} {self._config.server_secret_key=}",
)
print(f"ROUTER listens on {self._config.url}")
except zmq.error.ZMQError as e:
logger.error(f"ZMQ error encountered {e} during evaluator initialization")
print(f"ZMQ error encountered {e} during evaluator initialization")
Expand Down

0 comments on commit 5d881ac

Please sign in to comment.