Skip to content

Commit

Permalink
remove 127.0.0.1 usage and rename to localhost for ipv6 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
LeiGlobus committed Nov 22, 2024
1 parent 3733a12 commit 565333e
Show file tree
Hide file tree
Showing 24 changed files with 154 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def setup_server_auth(self):
# Start an authenticator for this context.
self.auth = ThreadAuthenticator(self.context)
self.auth.start()
self.auth.allow("127.0.0.1")
self.auth.allow("localhost")
# Tell the authenticator how to handle CURVE requests

if not self.ironhouse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import multiprocessing
import os
import queue
import socket
import threading
import time
import typing as t
Expand Down Expand Up @@ -299,14 +300,15 @@ def __init__(
self.endpoint_id = endpoint_id
self._task_counter = 0

try:
ipaddress.ip_address(address=address)
except Exception:
log.critical(
f"Invalid address supplied: {address}. "
"Please use a valid IPv4 or IPv6 address"
if not HighThroughputEngine.is_hostname_or_ip(address):
err_msg = (
# yes, suggesting `=` formatter, so it's clear which argument.
f"Invalid address: {address=}\n\n"
"Expecting an interface name, hostname, IPv4 address, or IPv6 address."
)
raise
log.critical(err_msg)
raise ValueError(err_msg)

self.address = address
self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
Expand Down Expand Up @@ -377,13 +379,13 @@ def start(
self.endpoint_id = endpoint_id

self.outgoing_q = zmq_pipes.TasksOutgoing(
"127.0.0.1", self.interchange_port_range
"localhost", self.interchange_port_range
)
self.incoming_q = zmq_pipes.ResultsIncoming(
"127.0.0.1", self.interchange_port_range
"localhost", self.interchange_port_range
)
self.command_client = zmq_pipes.CommandClient(
"127.0.0.1", self.interchange_port_range
"localhost", self.interchange_port_range
)

self.is_alive = True
Expand Down Expand Up @@ -419,6 +421,27 @@ def start(

return self.outgoing_q.port, self.incoming_q.port, self.command_client.port

@staticmethod
def is_hostname_or_ip(hostname_or_ip: str) -> bool:
"""
Utility method to verify that the input is a valid hostname or
IP address.
"""
if not hostname_or_ip:
return False
else:
try:
socket.gethostbyname(hostname_or_ip)
return True
except socket.gaierror:
# Not a hostname, now check IP
pass
try:
ipaddress.ip_address(address=hostname_or_ip)
except ValueError:
return False
return True

def _start_local_interchange_process(self):
"""Starts the interchange process locally
Expand All @@ -431,7 +454,7 @@ def _start_local_interchange_process(self):
name="Engine-Interchange",
args=(comm_q,),
kwargs={
"client_address": "127.0.0.1", # engine and ix are on the same node
"client_address": "localhost", # engine and ix are on same node
"client_ports": (
self.outgoing_q.port,
self.incoming_q.port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ def __init__(
worker_mode=None,
cold_routing_interval=10.0,
scaling_enabled=True,
client_address="127.0.0.1",
interchange_address="127.0.0.1",
client_address="localhost",
interchange_address="localhost",
client_ports: tuple[int, int, int] = (50055, 50056, 50057),
worker_ports=None,
worker_port_range=None,
Expand All @@ -134,11 +134,11 @@ def __init__(
client_address : str
The ip address at which the parsl client can be reached.
Default: "127.0.0.1"
Default: "localhost"
interchange_address : str
The ip address at which the workers will be able to reach the Interchange.
Default: "127.0.0.1"
Default: "localhost"
client_ports : tuple[int, int, int]
The ports at which the client can be reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class Manager:

def __init__(
self,
task_q_url="tcp://127.0.0.1:50097",
result_q_url="tcp://127.0.0.1:50098",
task_q_url="tcp://localhost:50097",
result_q_url="tcp://localhost:50098",
max_queue_size=10,
cores_per_worker=1,
available_accelerators: list[str] | None = None,
Expand Down Expand Up @@ -213,7 +213,7 @@ def __init__(

self.funcx_task_socket = self.context.socket(zmq.ROUTER)
self.funcx_task_socket.set_hwm(0)
self.address = "127.0.0.1"
self.address = "localhost"
self.worker_port = self.funcx_task_socket.bind_to_random_port(
"tcp://*",
min_port=self.internal_worker_port_range[0],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Worker:
Worker id string
address : str
Address at which the manager might be reached. This is usually 127.0.0.1
Address at which the manager might be reached. This is usually localhost
port : int
Port at which the manager can be reached
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env python3

from __future__ import annotations

import ipaddress
import logging
import time

Expand All @@ -10,6 +13,40 @@
log = logging.getLogger(__name__)


def _zmq_canonicalize_address(addr: str | int) -> str:
try:
ip = ipaddress.ip_address(addr)
except ValueError:
# Not a valid IPv4 or IPv6 address
if isinstance(addr, int):
# If it was an integer, then it's just plain invalid
raise

# Otherwise, it was likely a hostname; let another layer deal with it
return addr

if ip.version == 4:
return str(ip) # like "12.34.56.78"
elif ip.version == 6:
return f"[{ip}]" # like "[::1]"


def _zmq_create_socket_port(context: zmq.Context, ip_address: str | int, port_range):
"""
Utility method with logic shared by all the pipes
"""
sock = context.socket(zmq.DEALER)
sock.set_hwm(0)
# This option should work for both IPv4 and IPv6
sock.setsockopt(zmq.IPV6, True)
port = sock.bind_to_random_port(
f"tcp://{_zmq_canonicalize_address(ip_address)}",
min_port=port_range[0],
max_port=port_range[1],
)
return sock, port


class CommandClient:
"""CommandClient"""

Expand All @@ -24,13 +61,10 @@ def __init__(self, ip_address, port_range):
Port range for the comms between client and interchange
"""

self.context = zmq.Context()
self.zmq_socket = self.context.socket(zmq.DEALER)
self.zmq_socket.set_hwm(0)
self.port = self.zmq_socket.bind_to_random_port(
f"tcp://{ip_address}",
min_port=port_range[0],
max_port=port_range[1],
self.zmq_socket, self.port = _zmq_create_socket_port(
self.context, ip_address, port_range
)

def run(self, message):
Expand Down Expand Up @@ -66,12 +100,8 @@ def __init__(self, ip_address, port_range):
"""
self.context = zmq.Context()
self.zmq_socket = self.context.socket(zmq.DEALER)
self.zmq_socket.set_hwm(0)
self.port = self.zmq_socket.bind_to_random_port(
f"tcp://{ip_address}",
min_port=port_range[0],
max_port=port_range[1],
self.zmq_socket, self.port = _zmq_create_socket_port(
self.context, ip_address, port_range
)
self.poller = zmq.Poller()
self.poller.register(self.zmq_socket, zmq.POLLOUT)
Expand Down Expand Up @@ -141,12 +171,8 @@ def __init__(self, ip_address, port_range):
"""
self.context = zmq.Context()
self.results_receiver = self.context.socket(zmq.DEALER)
self.results_receiver.set_hwm(0)
self.port = self.results_receiver.bind_to_random_port(
f"tcp://{ip_address}",
min_port=port_range[0],
max_port=port_range[1],
self.results_receiver, self.port = _zmq_create_socket_port(
self.context, ip_address, port_range
)

def get(self, block=True, timeout=None):
Expand Down
4 changes: 2 additions & 2 deletions compute_endpoint/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _runner(engine_type: t.Type[GlobusComputeEngineBase], **kwargs):
k = dict(max_workers=2)
elif engine_type is engines.GlobusComputeEngine:
k = dict(
address="127.0.0.1",
address="localhost",
heartbeat_period=engine_heartbeat,
heartbeat_threshold=2,
job_status_kwargs=dict(max_idletime=0, strategy_period=0.1),
Expand All @@ -153,7 +153,7 @@ def _runner(engine_type: t.Type[GlobusComputeEngineBase], **kwargs):
"""

k = dict(
address="127.0.0.1",
address="localhost",
heartbeat_period=engine_heartbeat,
heartbeat_threshold=1,
mpi_launcher="mpiexec",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ def test_with_funcx_config(self, mocker):
mock_interchange.return_value.stop.return_value = None

mock_optionals = {}
mock_optionals["interchange_address"] = "127.0.0.1"
mock_optionals["interchange_address"] = "localhost"

mock_funcx_config = {}
mock_funcx_config["endpoint_address"] = "127.0.0.1"
mock_funcx_config["endpoint_address"] = "localhost"

manager = Endpoint(funcx_dir=os.getcwd())
manager.name = "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
def gc_engine_scaling(tmp_path):
ep_id = uuid.uuid4()
engine = GlobusComputeEngine(
address="127.0.0.1",
address="localhost",
heartbeat_period=1,
heartbeat_threshold=2,
provider=LocalProvider(
Expand All @@ -37,7 +37,7 @@ def gc_engine_scaling(tmp_path):
def gc_engine_non_scaling(tmp_path):
ep_id = uuid.uuid4()
engine = GlobusComputeEngine(
address="127.0.0.1",
address="localhost",
heartbeat_period=1,
heartbeat_threshold=2,
provider=LocalProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_add_worker(self, mocker):
worker_map = WorkerMap(1, [])
worker = worker_map.add_worker(
worker_id="0",
address="127.0.0.1",
address="localhost",
debug=logging.DEBUG,
uid="test1",
logdir=os.getcwd(),
Expand All @@ -31,7 +31,7 @@ def test_add_worker(self, mocker):
worker_map = WorkerMap(1, ["0"])
worker_map.add_worker(
worker_id="1",
address="127.0.0.1",
address="localhost",
debug=logging.DEBUG,
uid="test1",
logdir=os.getcwd(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ def test_repeated_fail(mock_gce, ez_pack_task):


def test_default_retries_is_0():
engine = GlobusComputeEngine(address="127.0.0.1")
engine = GlobusComputeEngine(address="localhost")
assert engine.max_retries_on_system_failure == 0, "Users must knowingly opt-in"
2 changes: 1 addition & 1 deletion compute_endpoint/tests/unit/test_bad_endpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
_MOCK_BASE = "globus_compute_endpoint.engines.high_throughput.engine."


@pytest.mark.parametrize("address", ("localhost", "login1.theta.alcf.anl.gov", "*"))
@pytest.mark.parametrize("address", ("example", "a.b.c.d.e", "*"))
def test_invalid_address(address, htex_warns):
with mock.patch(f"{_MOCK_BASE}log") as mock_log:
with pytest.raises(ValueError):
Expand Down
2 changes: 1 addition & 1 deletion compute_endpoint/tests/unit/test_boot_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def fake_ep_dir(fs: fakefs.FakeFilesystem, ep_name) -> pathlib.Path:
display_name: null
engine:
type: GlobusComputeEngine
address: 127.0.0.1
address: localhost
provider:
type: LocalProvider
init_blocks: 1
Expand Down
6 changes: 3 additions & 3 deletions compute_endpoint/tests/unit/test_cli_behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def func(name=ep_name, ep_uuid=None):
display_name: null
engine:
type: GlobusComputeEngine
address: 127.0.0.1
address: localhost
provider:
type: LocalProvider
init_blocks: 1
Expand All @@ -112,7 +112,7 @@ def func(name=ep_name, ep_uuid=None):
heartbeat_period: {{ heartbeat }}
engine:
type: GlobusComputeEngine
address: 127.0.0.1
address: localhost
provider:
type: LocalProvider
init_blocks: 1
Expand Down Expand Up @@ -463,7 +463,7 @@ def test_config_yaml_display_none(run_line, mock_command_ensure, display_name):
run_line(config_cmd)

conf_dict = dict(yaml.safe_load(conf.read_text()))
conf_dict["engine"]["address"] = "127.0.0.1" # avoid unnecessary DNS lookup
conf_dict["engine"]["address"] = "localhost" # avoid unnecessary DNS lookup
conf = load_config_yaml(yaml.safe_dump(conf_dict))

assert conf.display_name is None, conf.display_name
Expand Down
6 changes: 3 additions & 3 deletions compute_endpoint/tests/unit/test_endpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

@pytest.fixture
def config_dict():
return {"engine": {"type": "GlobusComputeEngine", "address": "127.0.0.1"}}
return {"engine": {"type": "GlobusComputeEngine", "address": "localhost"}}


@pytest.fixture
Expand Down Expand Up @@ -139,7 +139,7 @@ def test_conditional_engine_strategy(
):
config_dict["engine"]["type"] = engine_type
config_dict["engine"]["strategy"] = strategy
config_dict["engine"]["address"] = "127.0.0.1"
config_dict["engine"]["address"] = "localhost"

if engine_type == "GlobusComputeEngine":
if isinstance(strategy, str) or strategy is None:
Expand Down Expand Up @@ -172,7 +172,7 @@ def test_provider_container_compatibility(
):
config_dict["engine"]["container_uri"] = "docker://ubuntu"
config_dict["engine"]["provider"] = {"type": provider_type}
config_dict["engine"]["address"] = "127.0.0.1"
config_dict["engine"]["address"] = "localhost"

if compatible:
UserEndpointConfigModel(**config_dict)
Expand Down
Loading

0 comments on commit 565333e

Please sign in to comment.