Skip to content

Commit

Permalink
Update latest parsl release
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Nov 27, 2024
2 parents 06bb080 + 7c2646e commit 78bcfa5
Show file tree
Hide file tree
Showing 32 changed files with 208 additions and 226 deletions.
20 changes: 19 additions & 1 deletion parsl/addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
so some experimentation will probably be needed to choose the correct one.
"""

import ipaddress
import logging
import platform
import socket
Expand All @@ -17,7 +18,7 @@
except ImportError:
fcntl = None # type: ignore[assignment]
import struct
from typing import Callable, List, Set
from typing import Callable, List, Set, Union

import psutil
import typeguard
Expand Down Expand Up @@ -156,3 +157,20 @@ def get_any_address() -> str:
if addr == '':
raise Exception('Cannot find address of the local machine.')
return addr


def tcp_url(address: str, port: Union[str, int, None] = None) -> str:
"""Construct a tcp url safe for IPv4 and IPv6"""
if address == "*":
return "tcp://*"

ip_addr = ipaddress.ip_address(address)

port_suffix = f":{port}" if port else ""

if ip_addr.version == 6 and port_suffix:
url = f"tcp://[{address}]{port_suffix}"
else:
url = f"tcp://{address}{port_suffix}"

return url
28 changes: 0 additions & 28 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,3 @@ def script_dir(self) -> str:
@script_dir.setter
def script_dir(self, value: str) -> None:
pass

@abstractmethod
def push_file(self, source: str, dest_dir: str) -> str:
''' Channel will take care of moving the file from source to the destination
directory
Args:
source (string) : Full filepath of the file to be moved
dest_dir (string) : Absolute path of the directory to move to
Returns:
destination_path (string)
'''
pass

@abstractmethod
def pull_file(self, remote_source: str, local_dir: str) -> str:
''' Transport file on the remote side to a local directory
Args:
remote_source (string): remote_source
local_dir (string): Local directory to copy to
Returns:
destination_path (string)
'''
pass
36 changes: 0 additions & 36 deletions parsl/channels/local/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import logging
import os
import shutil
import subprocess

from parsl.channels.base import Channel
from parsl.channels.errors import FileCopyException
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,40 +55,6 @@ def execute_wait(self, cmd, walltime=None):

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

def push_file(self, source, dest_dir):
''' If the source files dirpath is the same as dest_dir, a copy
is not necessary, and nothing is done. Else a copy is made.
Args:
- source (string) : Path to the source file
- dest_dir (string) : Path to the directory to which the files is to be copied
Returns:
- destination_path (String) : Absolute path of the destination file
Raises:
- FileCopyException : If file copy failed.
'''

local_dest = os.path.join(dest_dir, os.path.basename(source))

# Only attempt to copy if the target dir and source dir are different
if os.path.dirname(source) != dest_dir:
try:
shutil.copyfile(source, local_dest)
os.chmod(local_dest, 0o700)

except OSError as e:
raise FileCopyException(e, "localhost")

else:
os.chmod(local_dest, 0o700)

return local_dest

def pull_file(self, remote_source, local_dir):
return self.push_file(remote_source, local_dir)

@property
def script_dir(self):
return self._script_dir
Expand Down
4 changes: 4 additions & 0 deletions parsl/curvezmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
except zmq.ZMQError as e:
raise ValueError("Invalid CurveZMQ key format") from e
sock.setsockopt(zmq.CURVE_SERVER, True) # Must come before bind

# This flag enables IPV6 in addition to IPV4
sock.setsockopt(zmq.IPV6, True)
return sock

def term(self):
Expand Down Expand Up @@ -202,4 +205,5 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
sock.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
except zmq.ZMQError as e:
raise ValueError("Invalid CurveZMQ key format") from e
sock.setsockopt(zmq.IPV6, True)
return sock
2 changes: 0 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ def __init__(self, config: Config) -> None:
self.monitoring = config.monitoring

if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.monitoring.start(self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
Expand Down
37 changes: 37 additions & 0 deletions parsl/executors/execute_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os

from parsl.serialize import unpack_res_spec_apply_message


def execute_task(bufs: bytes):
"""Deserialize the buffer and execute the task.
Returns the result or throws exception.
"""
f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs)

for varname in resource_spec:
envname = "PARSL_" + str(varname).upper()
os.environ[envname] = str(resource_spec[varname])

# We might need to look into callability of the function from itself
# since we change it's name in the new namespace
prefix = "parsl_"
fname = prefix + "f"
argname = prefix + "args"
kwargname = prefix + "kwargs"
resultname = prefix + "result"

code = "{0} = {1}(*{2}, **{3})".format(resultname, fname,
argname, kwargname)

user_ns = locals()
user_ns.update({
'__builtins__': __builtins__,
fname: f,
argname: args,
kwargname: kwargs,
resultname: resultname
})

exec(code, user_ns, user_ns)
return user_ns.get(resultname)
2 changes: 1 addition & 1 deletion parsl/executors/flux/execute_parsl_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
import os

from parsl.executors.execute_task import execute_task
from parsl.executors.flux import TaskResult
from parsl.executors.high_throughput.process_worker_pool import execute_task
from parsl.serialize import serialize


Expand Down
19 changes: 14 additions & 5 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,19 @@
address : string
An address to connect to the main Parsl process which is reachable from the network in which
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx).
workers will be running. This field expects an IPv4 or IPv6 address.
Most login nodes on clusters have several network interfaces available, only some of which
can be reached from the compute nodes. This field can be used to limit the executor to listen
only on a specific interface, and limiting connections to the internal network.
By default, the executor will attempt to enumerate and connect through all possible addresses.
Setting an address here overrides the default behavior.
default=None
loopback_address: string
Specify address used for internal communication between executor and interchange.
Supports IPv4 and IPv6 addresses
default=127.0.0.1
worker_ports : (int, int)
Specify the ports to be used by workers to connect to Parsl. If this option is specified,
worker_port_range will not be honored.
Expand Down Expand Up @@ -224,6 +229,7 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
Parsl will create names as integers starting with 0.
default: empty list
"""

@typeguard.typechecked
Expand All @@ -233,6 +239,7 @@ def __init__(self,
launch_cmd: Optional[str] = None,
interchange_launch_cmd: Optional[Sequence[str]] = None,
address: Optional[str] = None,
loopback_address: str = "127.0.0.1",
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000),
Expand Down Expand Up @@ -268,6 +275,8 @@ def __init__(self,
self.address = address
self.address_probe_timeout = address_probe_timeout
self.manager_selector = manager_selector
self.loopback_address = loopback_address

if self.address:
self.all_addresses = address
else:
Expand Down Expand Up @@ -408,13 +417,13 @@ def start(self):
)

self.outgoing_q = zmq_pipes.TasksOutgoing(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self.loopback_address, self.interchange_port_range, self.cert_dir
)
self.incoming_q = zmq_pipes.ResultsIncoming(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self.loopback_address, self.interchange_port_range, self.cert_dir
)
self.command_client = zmq_pipes.CommandClient(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self.loopback_address, self.interchange_port_range, self.cert_dir
)

self._result_queue_thread = None
Expand Down Expand Up @@ -515,7 +524,7 @@ def _start_local_interchange_process(self) -> None:
get the worker task and result ports that the interchange has bound to.
"""

interchange_config = {"client_address": "127.0.0.1",
interchange_config = {"client_address": self.loopback_address,
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
Expand Down
15 changes: 8 additions & 7 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import zmq

from parsl import curvezmq
from parsl.addresses import tcp_url
from parsl.app.errors import RemoteExceptionWrapper
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
from parsl.executors.high_throughput.manager_record import ManagerRecord
Expand Down Expand Up @@ -115,13 +116,13 @@ def __init__(self,
self.zmq_context = curvezmq.ServerContext(self.cert_dir)
self.task_incoming = self.zmq_context.socket(zmq.DEALER)
self.task_incoming.set_hwm(0)
self.task_incoming.connect("tcp://{}:{}".format(client_address, client_ports[0]))
self.task_incoming.connect(tcp_url(client_address, client_ports[0]))
self.results_outgoing = self.zmq_context.socket(zmq.DEALER)
self.results_outgoing.set_hwm(0)
self.results_outgoing.connect("tcp://{}:{}".format(client_address, client_ports[1]))
self.results_outgoing.connect(tcp_url(client_address, client_ports[1]))

self.command_channel = self.zmq_context.socket(zmq.REP)
self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2]))
self.command_channel.connect(tcp_url(client_address, client_ports[2]))
logger.info("Connected to client")

self.run_id = run_id
Expand All @@ -144,14 +145,14 @@ def __init__(self,
self.worker_task_port = self.worker_ports[0]
self.worker_result_port = self.worker_ports[1]

self.task_outgoing.bind(f"tcp://{self.interchange_address}:{self.worker_task_port}")
self.results_incoming.bind(f"tcp://{self.interchange_address}:{self.worker_result_port}")
self.task_outgoing.bind(tcp_url(self.interchange_address, self.worker_task_port))
self.results_incoming.bind(tcp_url(self.interchange_address, self.worker_result_port))

else:
self.worker_task_port = self.task_outgoing.bind_to_random_port(f"tcp://{self.interchange_address}",
self.worker_task_port = self.task_outgoing.bind_to_random_port(tcp_url(self.interchange_address),
min_port=worker_port_range[0],
max_port=worker_port_range[1], max_tries=100)
self.worker_result_port = self.results_incoming.bind_to_random_port(f"tcp://{self.interchange_address}",
self.worker_result_port = self.results_incoming.bind_to_random_port(tcp_url(self.interchange_address),
min_port=worker_port_range[0],
max_port=worker_port_range[1], max_tries=100)

Expand Down
2 changes: 2 additions & 0 deletions parsl/executors/high_throughput/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self,
launch_cmd: Optional[str] = None,
interchange_launch_cmd: Optional[str] = None,
address: Optional[str] = None,
loopback_address: str = "127.0.0.1",
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000),
Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(self,
launch_cmd=launch_cmd,
interchange_launch_cmd=interchange_launch_cmd,
address=address,
loopback_address=loopback_address,
worker_ports=worker_ports,
worker_port_range=worker_port_range,
interchange_port_range=interchange_port_range,
Expand Down
5 changes: 2 additions & 3 deletions parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ def put_task(self, task_package: dict):
"""Schedule task if resources are available otherwise backlog the task"""
user_ns = locals()
user_ns.update({"__builtins__": __builtins__})
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(
task_package["buffer"], user_ns, copy=False
)
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])

nodes_needed = resource_spec.get("num_nodes")
if nodes_needed:
Expand All @@ -177,6 +175,7 @@ def put_task(self, task_package: dict):
self._map_tasks_to_nodes[task_package["task_id"]] = allocated_nodes
buffer = pack_res_spec_apply_message(_f, _args, _kwargs, resource_spec)
task_package["buffer"] = buffer
task_package["resource_spec"] = resource_spec

self.pending_task_q.put(task_package)

Expand Down
8 changes: 4 additions & 4 deletions parsl/executors/high_throughput/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import zmq
from zmq.utils.monitor import recv_monitor_message

from parsl.addresses import get_all_addresses
from parsl.addresses import get_all_addresses, tcp_url

logger = logging.getLogger(__name__)

Expand All @@ -32,7 +32,8 @@ def probe_addresses(addresses, task_port, timeout=120):
for addr in addresses:
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
url = "tcp://{}:{}".format(addr, task_port)
socket.setsockopt(zmq.IPV6, True)
url = tcp_url(addr, task_port)
logger.debug("Trying to connect back on {}".format(url))
socket.connect(url)
addr_map[addr] = {'sock': socket,
Expand Down Expand Up @@ -71,8 +72,7 @@ def __init__(self, addresses, port):

address = probe_addresses(addresses, port)
print("Viable address :", address)
self.task_incoming.connect("tcp://{}:{}".format(address, port))
print("Here")
self.task_incoming.connect(tcp_url(address, port))

def heartbeat(self):
""" Send heartbeat to the incoming task queue
Expand Down
Loading

0 comments on commit 78bcfa5

Please sign in to comment.