From b9aa3dd199fc882cb537d6a4ade5e3ac257bc2bc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 23:36:30 +0200 Subject: [PATCH] Remove unused-in-production HTEX interchange default values (#3465) This PR makes all parameters to the Interchange class into mandatory keyword-only arguments. The removed defaults were not used in production use, because they were all specified explicitly in parsl/executors/high_throughput/executor.py too. The single exception to this was client_address, which was defaulted in the interchange and never specified by the exeuctor. This PR moves that default into executor.py too, to work like all the other defaults. See similar changes to the process worker pool, PR #2973, for more detailed justification. test_zmq_binding.py is the only test which instantiates Interchange objects directly (rather than testing the executor as a whole) and this PR modifies that test to explicitly specify all interchange parameters rather than relying on the otherwise-unused defaults. --- parsl/executors/high_throughput/executor.py | 3 +- .../executors/high_throughput/interchange.py | 41 ++++++++++--------- parsl/tests/test_htex/test_zmq_binding.py | 28 ++++++++++--- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index b5480e7937..2e20f41795 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -527,7 +527,8 @@ def _start_local_interchange_process(self): get the worker task and result ports that the interchange has bound to. """ self.interchange_proc = ForkProcess(target=interchange.starter, - kwargs={"client_ports": (self.outgoing_q.port, + kwargs={"client_address": "127.0.0.1", + "client_ports": (self.outgoing_q.port, self.incoming_q.port, self.command_client.port), "interchange_address": self.address, diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 4b3bab3563..764c9805a0 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -65,18 +65,19 @@ class Interchange: 3. Detect workers that have failed using heartbeats """ def __init__(self, - client_address: str = "127.0.0.1", - interchange_address: Optional[str] = None, - client_ports: Tuple[int, int, int] = (50055, 50056, 50057), - worker_ports: Optional[Tuple[int, int]] = None, - worker_port_range: Tuple[int, int] = (54000, 55000), - hub_address: Optional[str] = None, - hub_zmq_port: Optional[int] = None, - heartbeat_threshold: int = 60, - logdir: str = ".", - logging_level: int = logging.INFO, - poll_period: int = 10, - cert_dir: Optional[str] = None, + *, + client_address: str, + interchange_address: Optional[str], + client_ports: Tuple[int, int, int], + worker_ports: Optional[Tuple[int, int]], + worker_port_range: Tuple[int, int], + hub_address: Optional[str], + hub_zmq_port: Optional[int], + heartbeat_threshold: int, + logdir: str, + logging_level: int, + poll_period: int, + cert_dir: Optional[str], ) -> None: """ Parameters @@ -92,34 +93,34 @@ def __init__(self, The ports at which the client can be reached worker_ports : tuple(int, int) - The specific two ports at which workers will connect to the Interchange. Default: None + The specific two ports at which workers will connect to the Interchange. worker_port_range : tuple(int, int) The interchange picks ports at random from the range which will be used by workers. - This is overridden when the worker_ports option is set. Default: (54000, 55000) + This is overridden when the worker_ports option is set. hub_address : str The IP address at which the interchange can send info about managers to when monitoring is enabled. - Default: None (meaning monitoring disabled) + When None, monitoring is disabled. hub_zmq_port : str The port at which the interchange can send info about managers to when monitoring is enabled. - Default: None (meaning monitoring disabled) + When None, monitoring is disabled. heartbeat_threshold : int Number of seconds since the last heartbeat after which worker is considered lost. logdir : str - Parsl log directory paths. Logs and temp files go here. Default: '.' + Parsl log directory paths. Logs and temp files go here. logging_level : int - Logging level as defined in the logging module. Default: logging.INFO + Logging level as defined in the logging module. poll_period : int - The main thread polling period, in milliseconds. Default: 10ms + The main thread polling period, in milliseconds. cert_dir : str | None - Path to the certificate directory. Default: None + Path to the certificate directory. """ self.cert_dir = cert_dir self.logdir = logdir diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index eaf2e9731b..1194e632d0 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -1,3 +1,4 @@ +import logging import pathlib from typing import Optional from unittest import mock @@ -10,6 +11,21 @@ from parsl.executors.high_throughput.interchange import Interchange +def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange: + return Interchange(interchange_address=interchange_address, + cert_dir=cert_dir, + client_address="127.0.0.1", + client_ports=(50055, 50056, 50057), + worker_ports=None, + worker_port_range=(54000, 55000), + hub_address=None, + hub_zmq_port=None, + heartbeat_threshold=60, + logdir=".", + logging_level=logging.INFO, + poll_period=10) + + @pytest.fixture def encrypted(request: pytest.FixtureRequest): if hasattr(request, "param"): @@ -31,7 +47,7 @@ def test_interchange_curvezmq_sockets( mock_socket: mock.MagicMock, cert_dir: Optional[str], encrypted: bool ): address = "127.0.0.1" - ix = Interchange(interchange_address=address, cert_dir=cert_dir) + ix = make_interchange(interchange_address=address, cert_dir=cert_dir) assert isinstance(ix.zmq_context, curvezmq.ServerContext) assert ix.zmq_context.encrypted is encrypted assert mock_socket.call_count == 5 @@ -40,7 +56,7 @@ def test_interchange_curvezmq_sockets( @pytest.mark.local @pytest.mark.parametrize("encrypted", (True, False), indirect=True) def test_interchange_binding_no_address(cert_dir: Optional[str]): - ix = Interchange(cert_dir=cert_dir) + ix = make_interchange(interchange_address=None, cert_dir=cert_dir) assert ix.interchange_address == "*" @@ -49,7 +65,7 @@ def test_interchange_binding_no_address(cert_dir: Optional[str]): def test_interchange_binding_with_address(cert_dir: Optional[str]): # Using loopback address address = "127.0.0.1" - ix = Interchange(interchange_address=address, cert_dir=cert_dir) + ix = make_interchange(interchange_address=address, cert_dir=cert_dir) assert ix.interchange_address == address @@ -60,7 +76,7 @@ def test_interchange_binding_with_non_ipv4_address(cert_dir: Optional[str]): # Confirm that a ipv4 address is required address = "localhost" with pytest.raises(zmq.error.ZMQError): - Interchange(interchange_address=address, cert_dir=cert_dir) + make_interchange(interchange_address=address, cert_dir=cert_dir) @pytest.mark.local @@ -69,7 +85,7 @@ def test_interchange_binding_bad_address(cert_dir: Optional[str]): """Confirm that we raise a ZMQError when a bad address is supplied""" address = "550.0.0.0" with pytest.raises(zmq.error.ZMQError): - Interchange(interchange_address=address, cert_dir=cert_dir) + make_interchange(interchange_address=address, cert_dir=cert_dir) @pytest.mark.local @@ -77,7 +93,7 @@ def test_interchange_binding_bad_address(cert_dir: Optional[str]): def test_limited_interface_binding(cert_dir: Optional[str]): """When address is specified the worker_port would be bound to it rather than to 0.0.0.0""" address = "127.0.0.1" - ix = Interchange(interchange_address=address, cert_dir=cert_dir) + ix = make_interchange(interchange_address=address, cert_dir=cert_dir) ix.worker_result_port proc = psutil.Process() conns = proc.connections(kind="tcp")