Skip to content

Commit

Permalink
revert localhost back to ::1 in some circumstances and add ipv6 opt
Browse files Browse the repository at this point in the history
  • Loading branch information
LeiGlobus committed Nov 26, 2024
1 parent 453dc3a commit 617c521
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def __init__(
if linger is not None:
self.zmq_socket.setsockopt(zmq.LINGER, linger)

self.zmq_socket.setsockopt(zmq.IPV6, True)

# all zmq setsockopt calls must be done before bind/connect is called
if self.mode == "server":
self.zmq_socket.bind(f"tcp://*:{port}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def __init__(
# Tuning info
prefetch_capacity=10,
provider=LocalProvider(),
address="127.0.0.1",
address="::1",
worker_ports=None,
worker_port_range=(54000, 55000),
interchange_port_range=(55000, 56000),
Expand Down Expand Up @@ -379,13 +379,13 @@ def start(
self.endpoint_id = endpoint_id

self.outgoing_q = zmq_pipes.TasksOutgoing(
"localhost", self.interchange_port_range
"::1", self.interchange_port_range
)
self.incoming_q = zmq_pipes.ResultsIncoming(
"localhost", self.interchange_port_range
"::1", self.interchange_port_range
)
self.command_client = zmq_pipes.CommandClient(
"localhost", self.interchange_port_range
"::1", self.interchange_port_range
)

self.is_alive = True
Expand Down Expand Up @@ -454,7 +454,7 @@ def _start_local_interchange_process(self):
name="Engine-Interchange",
args=(comm_q,),
kwargs={
"client_address": "localhost", # engine and ix are on same node
"client_address": "::1", # engine and ix are on same node
"client_ports": (
self.outgoing_q.port,
self.incoming_q.port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ def __init__(
worker_mode=None,
cold_routing_interval=10.0,
scaling_enabled=True,
client_address="localhost",
interchange_address="localhost",
client_address="127.0.0.1",
interchange_address="127.0.0.1",
client_ports: tuple[int, int, int] = (50055, 50056, 50057),
worker_ports=None,
worker_port_range=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(
# Linger is set to 0, so that the manager can exit even when there might be
# messages in the pipe
self.task_incoming.setsockopt(zmq.LINGER, 0)
self.task_incoming.setsockopt(zmq.IPV6, True)
self.task_incoming.connect(task_q_url)

self.logdir = logdir
Expand All @@ -179,6 +180,7 @@ def __init__(
self.result_outgoing = self.context.socket(zmq.DEALER)
self.result_outgoing.setsockopt(zmq.IDENTITY, uid.encode("utf-8"))
self.result_outgoing.setsockopt(zmq.LINGER, 0)
self.result_outgoing.setsockopt(zmq.IPV6, True)
self.result_outgoing.connect(result_q_url)

log.info("Manager connected")
Expand Down Expand Up @@ -213,6 +215,7 @@ def __init__(

self.funcx_task_socket = self.context.socket(zmq.ROUTER)
self.funcx_task_socket.set_hwm(0)
self.funcx_task_socket.setsockopt(zmq.IPV6, True)
self.address = "localhost"
self.worker_port = self.funcx_task_socket.bind_to_random_port(
"tcp://*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(

self.task_socket = self.context.socket(zmq.DEALER)
self.task_socket.setsockopt(zmq.IDENTITY, self.identity)
self.task_socket.setsockopt(zmq.IPV6, True)

log.info(f"Trying to connect to : tcp://{self.address}:{self.port}")
self.task_socket.connect(f"tcp://{self.address}:{self.port}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _zmq_create_socket_port(context: zmq.Context, ip_address: str | int, port_ra
sock.set_hwm(0)
# This option should work for both IPv4 and IPv6...?
# May not work until Parsl is updated?
# sock.setsockopt(zmq.IPV6, True)
sock.setsockopt(zmq.IPV6, True)

port = sock.bind_to_random_port(
f"tcp://{_zmq_canonicalize_address(ip_address)}",
Expand Down
4 changes: 2 additions & 2 deletions compute_endpoint/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _runner(engine_type: t.Type[GlobusComputeEngineBase], **kwargs):
k = dict(max_workers=2)
elif engine_type is engines.GlobusComputeEngine:
k = dict(
address="localhost",
address="::1",
heartbeat_period=engine_heartbeat,
heartbeat_threshold=2,
job_status_kwargs=dict(max_idletime=0, strategy_period=0.1),
Expand All @@ -153,7 +153,7 @@ def _runner(engine_type: t.Type[GlobusComputeEngineBase], **kwargs):
"""

k = dict(
address="localhost",
address="::1",
heartbeat_period=engine_heartbeat,
heartbeat_threshold=1,
mpi_launcher="mpiexec",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ def test_repeated_fail(mock_gce, ez_pack_task):


def test_default_retries_is_0():
engine = GlobusComputeEngine(address="localhost")
engine = GlobusComputeEngine(address="::1")
assert engine.max_retries_on_system_failure == 0, "Users must knowingly opt-in"

0 comments on commit 617c521

Please sign in to comment.