From 617c5211885e1ef45e45f36f46a337045f353958 Mon Sep 17 00:00:00 2001 From: Lei Wang Date: Mon, 25 Nov 2024 19:26:39 -0500 Subject: [PATCH] revert localhost back to ::1 in some circumstances and add ipv6 opt --- .../globus_compute_endpoint/endpoint/taskqueue.py | 2 ++ .../engines/high_throughput/engine.py | 10 +++++----- .../engines/high_throughput/interchange.py | 4 ++-- .../engines/high_throughput/manager.py | 3 +++ .../engines/high_throughput/worker.py | 1 + .../engines/high_throughput/zmq_pipes.py | 2 +- compute_endpoint/tests/conftest.py | 4 ++-- .../endpoint/executors/test_gcengine_retries.py | 2 +- 8 files changed, 17 insertions(+), 11 deletions(-) diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py b/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py index 86f763b4e..30dff20ec 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/taskqueue.py @@ -91,6 +91,8 @@ def __init__( if linger is not None: self.zmq_socket.setsockopt(zmq.LINGER, linger) + self.zmq_socket.setsockopt(zmq.IPV6, True) + # all zmq setsockopt calls must be done before bind/connect is called if self.mode == "server": self.zmq_socket.bind(f"tcp://*:{port}") diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py index 05f3eccf6..35bc763f0 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py @@ -244,7 +244,7 @@ def __init__( # Tuning info prefetch_capacity=10, provider=LocalProvider(), - address="127.0.0.1", + address="::1", worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), @@ -379,13 +379,13 @@ def start( self.endpoint_id = endpoint_id self.outgoing_q = zmq_pipes.TasksOutgoing( - "localhost", self.interchange_port_range + "::1", self.interchange_port_range ) self.incoming_q = zmq_pipes.ResultsIncoming( - "localhost", self.interchange_port_range + "::1", self.interchange_port_range ) self.command_client = zmq_pipes.CommandClient( - "localhost", self.interchange_port_range + "::1", self.interchange_port_range ) self.is_alive = True @@ -454,7 +454,7 @@ def _start_local_interchange_process(self): name="Engine-Interchange", args=(comm_q,), kwargs={ - "client_address": "localhost", # engine and ix are on same node + "client_address": "::1", # engine and ix are on same node "client_ports": ( self.outgoing_q.port, self.incoming_q.port, diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py index 3c8fa2c81..acb39185f 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/interchange.py @@ -113,8 +113,8 @@ def __init__( worker_mode=None, cold_routing_interval=10.0, scaling_enabled=True, - client_address="localhost", - interchange_address="localhost", + client_address="127.0.0.1", + interchange_address="127.0.0.1", client_ports: tuple[int, int, int] = (50055, 50056, 50057), worker_ports=None, worker_port_range=None, diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py index 8cbda14fd..bf51f243e 100755 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/manager.py @@ -171,6 +171,7 @@ def __init__( # Linger is set to 0, so that the manager can exit even when there might be # messages in the pipe self.task_incoming.setsockopt(zmq.LINGER, 0) + self.task_incoming.setsockopt(zmq.IPV6, True) self.task_incoming.connect(task_q_url) self.logdir = logdir @@ -179,6 +180,7 @@ def __init__( self.result_outgoing = self.context.socket(zmq.DEALER) self.result_outgoing.setsockopt(zmq.IDENTITY, uid.encode("utf-8")) self.result_outgoing.setsockopt(zmq.LINGER, 0) + self.result_outgoing.setsockopt(zmq.IPV6, True) self.result_outgoing.connect(result_q_url) log.info("Manager connected") @@ -213,6 +215,7 @@ def __init__( self.funcx_task_socket = self.context.socket(zmq.ROUTER) self.funcx_task_socket.set_hwm(0) + self.funcx_task_socket.setsockopt(zmq.IPV6, True) self.address = "localhost" self.worker_port = self.funcx_task_socket.bind_to_random_port( "tcp://*", diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/worker.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/worker.py index 2acb374b3..af8aeca9d 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/worker.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/worker.py @@ -79,6 +79,7 @@ def __init__( self.task_socket = self.context.socket(zmq.DEALER) self.task_socket.setsockopt(zmq.IDENTITY, self.identity) + self.task_socket.setsockopt(zmq.IPV6, True) log.info(f"Trying to connect to : tcp://{self.address}:{self.port}") self.task_socket.connect(f"tcp://{self.address}:{self.port}") diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/zmq_pipes.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/zmq_pipes.py index ff493337a..aac200d55 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/zmq_pipes.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/zmq_pipes.py @@ -39,7 +39,7 @@ def _zmq_create_socket_port(context: zmq.Context, ip_address: str | int, port_ra sock.set_hwm(0) # This option should work for both IPv4 and IPv6...? # May not work until Parsl is updated? - # sock.setsockopt(zmq.IPV6, True) + sock.setsockopt(zmq.IPV6, True) port = sock.bind_to_random_port( f"tcp://{_zmq_canonicalize_address(ip_address)}", diff --git a/compute_endpoint/tests/conftest.py b/compute_endpoint/tests/conftest.py index 4f8341f02..6c5776a09 100644 --- a/compute_endpoint/tests/conftest.py +++ b/compute_endpoint/tests/conftest.py @@ -139,7 +139,7 @@ def _runner(engine_type: t.Type[GlobusComputeEngineBase], **kwargs): k = dict(max_workers=2) elif engine_type is engines.GlobusComputeEngine: k = dict( - address="localhost", + address="::1", heartbeat_period=engine_heartbeat, heartbeat_threshold=2, job_status_kwargs=dict(max_idletime=0, strategy_period=0.1), @@ -153,7 +153,7 @@ def _runner(engine_type: t.Type[GlobusComputeEngineBase], **kwargs): """ k = dict( - address="localhost", + address="::1", heartbeat_period=engine_heartbeat, heartbeat_threshold=1, mpi_launcher="mpiexec", diff --git a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py index 914bca982..ed800dde5 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py +++ b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py @@ -100,5 +100,5 @@ def test_repeated_fail(mock_gce, ez_pack_task): def test_default_retries_is_0(): - engine = GlobusComputeEngine(address="localhost") + engine = GlobusComputeEngine(address="::1") assert engine.max_retries_on_system_failure == 0, "Users must knowingly opt-in"