Skip to content

Commit

Permalink
fiddle with interchange default values
Browse files Browse the repository at this point in the history
there are default values in the interchange code, but they are all specified in the executor code too, so these defautls will
never be used. remove them as misleading.

see similar changes to process worker pool, PR #2973, for more detailed justification

needs to change zmq sockets test because that assumes the arbitrary defaults are present.
which is no longer the case.
but if you want to initialize an interchange that requires you to specify all this stuff, and want some arbitrary values,
then make those arbitrary values yourself.

client address parameter is now supplied by the executor - it was not before, and so the default/hard-coded value
now lives in the executor, not the interchange
  • Loading branch information
benclifford committed May 28, 2024
1 parent 0ad1587 commit f56c92c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ def _start_local_interchange_process(self):
get the worker task and result ports that the interchange has bound to.
"""
self.interchange_proc = ForkProcess(target=interchange.starter,
kwargs={"client_ports": (self.outgoing_q.port,
kwargs={"client_address": "127.0.0.1",
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
Expand Down
41 changes: 21 additions & 20 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,19 @@ class Interchange:
3. Detect workers that have failed using heartbeats
"""
def __init__(self,
client_address: str = "127.0.0.1",
interchange_address: Optional[str] = None,
client_ports: Tuple[int, int, int] = (50055, 50056, 50057),
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Tuple[int, int] = (54000, 55000),
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
heartbeat_threshold: int = 60,
logdir: str = ".",
logging_level: int = logging.INFO,
poll_period: int = 10,
cert_dir: Optional[str] = None,
*,
client_address: str,
interchange_address: Optional[str],
client_ports: Tuple[int, int, int],
worker_ports: Optional[Tuple[int, int]],
worker_port_range: Tuple[int, int],
hub_address: Optional[str],
hub_zmq_port: Optional[int],
heartbeat_threshold: int,
logdir: str,
logging_level: int,
poll_period: int,
cert_dir: Optional[str],
) -> None:
"""
Parameters
Expand All @@ -94,34 +95,34 @@ def __init__(self,
The ports at which the client can be reached
worker_ports : tuple(int, int)
The specific two ports at which workers will connect to the Interchange. Default: None
The specific two ports at which workers will connect to the Interchange.
worker_port_range : tuple(int, int)
The interchange picks ports at random from the range which will be used by workers.
This is overridden when the worker_ports option is set. Default: (54000, 55000)
This is overridden when the worker_ports option is set.
hub_address : str
The IP address at which the interchange can send info about managers to when monitoring is enabled.
Default: None (meaning monitoring disabled)
When None, monitoring is disabled.
hub_zmq_port : str
The port at which the interchange can send info about managers to when monitoring is enabled.
Default: None (meaning monitoring disabled)
When None, monitoring is disabled.
heartbeat_threshold : int
Number of seconds since the last heartbeat after which worker is considered lost.
logdir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
Parsl log directory paths. Logs and temp files go here.
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
Logging level as defined in the logging module.
poll_period : int
The main thread polling period, in milliseconds. Default: 10ms
The main thread polling period, in milliseconds.
cert_dir : str | None
Path to the certificate directory. Default: None
Path to the certificate directory.
"""
self.cert_dir = cert_dir
self.logdir = logdir
Expand Down
28 changes: 22 additions & 6 deletions parsl/tests/test_htex/test_zmq_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional
from unittest import mock

import logging
import psutil
import pytest
import zmq
Expand All @@ -10,6 +11,21 @@
from parsl.executors.high_throughput.interchange import Interchange


def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange:
return Interchange(interchange_address=interchange_address,
cert_dir=cert_dir,
client_address="127.0.0.1",
client_ports=(50055, 50056, 50057),
worker_ports=None,
worker_port_range=(54000, 55000),
hub_address=None,
hub_zmq_port=None,
heartbeat_threshold=60,
logdir=".",
logging_level=logging.INFO,
poll_period=10)


@pytest.fixture
def encrypted(request: pytest.FixtureRequest):
if hasattr(request, "param"):
Expand All @@ -31,7 +47,7 @@ def test_interchange_curvezmq_sockets(
mock_socket: mock.MagicMock, cert_dir: Optional[str], encrypted: bool
):
address = "127.0.0.1"
ix = Interchange(interchange_address=address, cert_dir=cert_dir)
ix = make_interchange(interchange_address=address, cert_dir=cert_dir)
assert isinstance(ix.zmq_context, curvezmq.ServerContext)
assert ix.zmq_context.encrypted is encrypted
assert mock_socket.call_count == 5
Expand All @@ -40,7 +56,7 @@ def test_interchange_curvezmq_sockets(
@pytest.mark.local
@pytest.mark.parametrize("encrypted", (True, False), indirect=True)
def test_interchange_binding_no_address(cert_dir: Optional[str]):
ix = Interchange(cert_dir=cert_dir)
ix = make_interchange(interchange_address=None, cert_dir=cert_dir)
assert ix.interchange_address == "*"


Expand All @@ -49,7 +65,7 @@ def test_interchange_binding_no_address(cert_dir: Optional[str]):
def test_interchange_binding_with_address(cert_dir: Optional[str]):
# Using loopback address
address = "127.0.0.1"
ix = Interchange(interchange_address=address, cert_dir=cert_dir)
ix = make_interchange(interchange_address=address, cert_dir=cert_dir)
assert ix.interchange_address == address


Expand All @@ -60,7 +76,7 @@ def test_interchange_binding_with_non_ipv4_address(cert_dir: Optional[str]):
# Confirm that a ipv4 address is required
address = "localhost"
with pytest.raises(zmq.error.ZMQError):
Interchange(interchange_address=address, cert_dir=cert_dir)
make_interchange(interchange_address=address, cert_dir=cert_dir)


@pytest.mark.local
Expand All @@ -69,15 +85,15 @@ def test_interchange_binding_bad_address(cert_dir: Optional[str]):
"""Confirm that we raise a ZMQError when a bad address is supplied"""
address = "550.0.0.0"
with pytest.raises(zmq.error.ZMQError):
Interchange(interchange_address=address, cert_dir=cert_dir)
make_interchange(interchange_address=address, cert_dir=cert_dir)


@pytest.mark.local
@pytest.mark.parametrize("encrypted", (True, False), indirect=True)
def test_limited_interface_binding(cert_dir: Optional[str]):
"""When address is specified the worker_port would be bound to it rather than to 0.0.0.0"""
address = "127.0.0.1"
ix = Interchange(interchange_address=address, cert_dir=cert_dir)
ix = make_interchange(interchange_address=address, cert_dir=cert_dir)
ix.worker_result_port
proc = psutil.Process()
conns = proc.connections(kind="tcp")
Expand Down

0 comments on commit f56c92c

Please sign in to comment.