Skip to content

Commit

Permalink
Remove unused-in-production HTEX interchange default values (#3465)
Browse files Browse the repository at this point in the history
This PR makes all parameters to the Interchange class into mandatory
keyword-only arguments.

The removed defaults were not used in production use, because they were
all specified explicitly in parsl/executors/high_throughput/executor.py too.

The single exception to this was client_address, which was defaulted in the
interchange and never specified by the exeuctor. This PR moves that
default into executor.py too, to work like all the other defaults.

See similar changes to the process worker pool, PR #2973, for more detailed
justification.

test_zmq_binding.py is the only test which instantiates Interchange objects
directly (rather than testing the executor as a whole) and this PR modifies
that test to explicitly specify all interchange parameters rather than
relying on the otherwise-unused defaults.
  • Loading branch information
benclifford authored Jun 10, 2024
1 parent 6e8358f commit b9aa3dd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ def _start_local_interchange_process(self):
get the worker task and result ports that the interchange has bound to.
"""
self.interchange_proc = ForkProcess(target=interchange.starter,
kwargs={"client_ports": (self.outgoing_q.port,
kwargs={"client_address": "127.0.0.1",
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
Expand Down
41 changes: 21 additions & 20 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,19 @@ class Interchange:
3. Detect workers that have failed using heartbeats
"""
def __init__(self,
client_address: str = "127.0.0.1",
interchange_address: Optional[str] = None,
client_ports: Tuple[int, int, int] = (50055, 50056, 50057),
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Tuple[int, int] = (54000, 55000),
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
heartbeat_threshold: int = 60,
logdir: str = ".",
logging_level: int = logging.INFO,
poll_period: int = 10,
cert_dir: Optional[str] = None,
*,
client_address: str,
interchange_address: Optional[str],
client_ports: Tuple[int, int, int],
worker_ports: Optional[Tuple[int, int]],
worker_port_range: Tuple[int, int],
hub_address: Optional[str],
hub_zmq_port: Optional[int],
heartbeat_threshold: int,
logdir: str,
logging_level: int,
poll_period: int,
cert_dir: Optional[str],
) -> None:
"""
Parameters
Expand All @@ -92,34 +93,34 @@ def __init__(self,
The ports at which the client can be reached
worker_ports : tuple(int, int)
The specific two ports at which workers will connect to the Interchange. Default: None
The specific two ports at which workers will connect to the Interchange.
worker_port_range : tuple(int, int)
The interchange picks ports at random from the range which will be used by workers.
This is overridden when the worker_ports option is set. Default: (54000, 55000)
This is overridden when the worker_ports option is set.
hub_address : str
The IP address at which the interchange can send info about managers to when monitoring is enabled.
Default: None (meaning monitoring disabled)
When None, monitoring is disabled.
hub_zmq_port : str
The port at which the interchange can send info about managers to when monitoring is enabled.
Default: None (meaning monitoring disabled)
When None, monitoring is disabled.
heartbeat_threshold : int
Number of seconds since the last heartbeat after which worker is considered lost.
logdir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
Parsl log directory paths. Logs and temp files go here.
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
Logging level as defined in the logging module.
poll_period : int
The main thread polling period, in milliseconds. Default: 10ms
The main thread polling period, in milliseconds.
cert_dir : str | None
Path to the certificate directory. Default: None
Path to the certificate directory.
"""
self.cert_dir = cert_dir
self.logdir = logdir
Expand Down
28 changes: 22 additions & 6 deletions parsl/tests/test_htex/test_zmq_binding.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import pathlib
from typing import Optional
from unittest import mock
Expand All @@ -10,6 +11,21 @@
from parsl.executors.high_throughput.interchange import Interchange


def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange:
return Interchange(interchange_address=interchange_address,
cert_dir=cert_dir,
client_address="127.0.0.1",
client_ports=(50055, 50056, 50057),
worker_ports=None,
worker_port_range=(54000, 55000),
hub_address=None,
hub_zmq_port=None,
heartbeat_threshold=60,
logdir=".",
logging_level=logging.INFO,
poll_period=10)


@pytest.fixture
def encrypted(request: pytest.FixtureRequest):
if hasattr(request, "param"):
Expand All @@ -31,7 +47,7 @@ def test_interchange_curvezmq_sockets(
mock_socket: mock.MagicMock, cert_dir: Optional[str], encrypted: bool
):
address = "127.0.0.1"
ix = Interchange(interchange_address=address, cert_dir=cert_dir)
ix = make_interchange(interchange_address=address, cert_dir=cert_dir)
assert isinstance(ix.zmq_context, curvezmq.ServerContext)
assert ix.zmq_context.encrypted is encrypted
assert mock_socket.call_count == 5
Expand All @@ -40,7 +56,7 @@ def test_interchange_curvezmq_sockets(
@pytest.mark.local
@pytest.mark.parametrize("encrypted", (True, False), indirect=True)
def test_interchange_binding_no_address(cert_dir: Optional[str]):
ix = Interchange(cert_dir=cert_dir)
ix = make_interchange(interchange_address=None, cert_dir=cert_dir)
assert ix.interchange_address == "*"


Expand All @@ -49,7 +65,7 @@ def test_interchange_binding_no_address(cert_dir: Optional[str]):
def test_interchange_binding_with_address(cert_dir: Optional[str]):
# Using loopback address
address = "127.0.0.1"
ix = Interchange(interchange_address=address, cert_dir=cert_dir)
ix = make_interchange(interchange_address=address, cert_dir=cert_dir)
assert ix.interchange_address == address


Expand All @@ -60,7 +76,7 @@ def test_interchange_binding_with_non_ipv4_address(cert_dir: Optional[str]):
# Confirm that a ipv4 address is required
address = "localhost"
with pytest.raises(zmq.error.ZMQError):
Interchange(interchange_address=address, cert_dir=cert_dir)
make_interchange(interchange_address=address, cert_dir=cert_dir)


@pytest.mark.local
Expand All @@ -69,15 +85,15 @@ def test_interchange_binding_bad_address(cert_dir: Optional[str]):
"""Confirm that we raise a ZMQError when a bad address is supplied"""
address = "550.0.0.0"
with pytest.raises(zmq.error.ZMQError):
Interchange(interchange_address=address, cert_dir=cert_dir)
make_interchange(interchange_address=address, cert_dir=cert_dir)


@pytest.mark.local
@pytest.mark.parametrize("encrypted", (True, False), indirect=True)
def test_limited_interface_binding(cert_dir: Optional[str]):
"""When address is specified the worker_port would be bound to it rather than to 0.0.0.0"""
address = "127.0.0.1"
ix = Interchange(interchange_address=address, cert_dir=cert_dir)
ix = make_interchange(interchange_address=address, cert_dir=cert_dir)
ix.worker_result_port
proc = psutil.Process()
conns = proc.connections(kind="tcp")
Expand Down

0 comments on commit b9aa3dd

Please sign in to comment.