Skip to content

Commit

Permalink
update to latest Parsl master; remove some defunct monitoring prototypes
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Dec 11, 2024
2 parents 78bcfa5 + a01f7e4 commit 92114c6
Show file tree
Hide file tree
Showing 80 changed files with 357 additions and 719 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Parsl - Parallel Scripting Library
==================================
|licence| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS|
|licence| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |NumFOCUS| |CZI-EOSS|

Parsl extends parallelism in Python beyond a single computer.

Expand Down Expand Up @@ -64,6 +64,9 @@ then explore the `parallel computing patterns <https://parsl.readthedocs.io/en/s
.. |CZI-EOSS| image:: https://chanzuckerberg.github.io/open-science/badges/CZI-EOSS.svg
:target: https://czi.co/EOSS
:alt: CZI's Essential Open Source Software for Science
.. |NumFOCUS| image:: https://img.shields.io/badge/powered%20by-NumFOCUS-orange.svg?style=flat&colorA=E1523D&colorB=007D8A
:target: https://numfocus.org
:alt: Powered by NumFOCUS


Quickstart
Expand Down
2 changes: 0 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ Exceptions
parsl.providers.errors.ScaleOutFailed
parsl.providers.errors.SchedulerMissingArgs
parsl.providers.errors.ScriptPathError
parsl.channels.errors.ChannelError
parsl.channels.errors.FileCopyException
parsl.executors.high_throughput.errors.WorkerLost
parsl.executors.high_throughput.interchange.ManagerLost
parsl.serialize.errors.DeserializationError
Expand Down
1 change: 0 additions & 1 deletion docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por
],
monitoring=MonitoringHub(
hub_address=address_by_hostname(),
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=10,
),
Expand Down
6 changes: 0 additions & 6 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ disallow_any_expr = True
[mypy-parsl.executors.high_throughput.interchange.*]
disallow_untyped_defs = True

[mypy-parsl.monitoring.node_reporter.*]
# because I haven't written the node reporter properly yet
check_untyped_defs = False
disallow_untyped_defs = False
no_implicit_optional = False

[mypy-parsl.monitoring.*]
disallow_untyped_decorators = True
check_untyped_defs = True
Expand Down
4 changes: 0 additions & 4 deletions parsl/channels/__init__.py

This file was deleted.

54 changes: 0 additions & 54 deletions parsl/channels/base.py

This file was deleted.

30 changes: 0 additions & 30 deletions parsl/channels/errors.py

This file was deleted.

66 changes: 0 additions & 66 deletions parsl/channels/local/local.py

This file was deleted.

1 change: 0 additions & 1 deletion parsl/configs/ASPIRE1.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
],
monitoring=MonitoringHub(
hub_address=address_by_interface('ib0'),
hub_port=55055,
resource_monitoring_interval=10,
),
strategy='simple',
Expand Down
2 changes: 0 additions & 2 deletions parsl/configs/cc_in2p3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import GridEngineProvider
Expand All @@ -10,7 +9,6 @@
label='cc_in2p3_htex',
max_workers_per_node=2,
provider=GridEngineProvider(
channel=LocalChannel(),
nodes_per_block=1,
init_blocks=2,
max_blocks=2,
Expand Down
2 changes: 0 additions & 2 deletions parsl/configs/frontera.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SrunLauncher
Expand All @@ -15,7 +14,6 @@
max_workers_per_node=1, # Set number of workers per node
provider=SlurmProvider(
cmd_timeout=60, # Add extra time for slow scheduler responses
channel=LocalChannel(),
nodes_per_block=2,
init_blocks=1,
min_blocks=1,
Expand Down
2 changes: 0 additions & 2 deletions parsl/configs/htex_local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider
Expand All @@ -10,7 +9,6 @@
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
),
Expand Down
2 changes: 0 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,8 +1201,6 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')
os.makedirs(executor.provider.script_dir, exist_ok=True)

executor.provider.channel.script_dir = executor.provider.script_dir

self.executors[executor.label] = executor
executor.start()
block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)]
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down
17 changes: 15 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def __init__(self,
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
self.interchange_launch_cmd = interchange_launch_cmd

self._result_queue_thread_exit = threading.Event()
self._result_queue_thread: Optional[threading.Thread] = None

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"
Expand Down Expand Up @@ -455,9 +458,11 @@ def _result_queue_worker(self):
"""
logger.debug("Result queue worker starting")

while not self.bad_state_is_set:
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
try:
msgs = self.incoming_q.get()
msgs = self.incoming_q.get(timeout_ms=self.poll_period)
if msgs is None: # timeout
continue

except IOError as e:
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
Expand Down Expand Up @@ -515,6 +520,8 @@ def _result_queue_worker(self):
else:
raise BadMessage("Message received with unknown type {}".format(msg['type']))

logger.info("Closing result ZMQ pipe")
self.incoming_q.close()
logger.info("Result queue worker finished")

def _start_local_interchange_process(self) -> None:
Expand Down Expand Up @@ -822,6 +829,8 @@ def shutdown(self, timeout: float = 10.0):

logger.info("Attempting HighThroughputExecutor shutdown")

logger.info("Terminating interchange and result queue thread")
self._result_queue_thread_exit.set()
self.interchange_proc.terminate()
try:
self.interchange_proc.wait(timeout=timeout)
Expand All @@ -846,6 +855,10 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Closing command client")
self.command_client.close()

logger.info("Waiting for result queue thread exit")
if self._result_queue_thread:
self._result_queue_thread.join()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down
17 changes: 13 additions & 4 deletions parsl/executors/high_throughput/zmq_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,24 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address),
min_port=port_range[0],
max_port=port_range[1])
self.poller = zmq.Poller()
self.poller.register(self.results_receiver, zmq.POLLIN)
self._lock = threading.Lock()

def get(self):
def get(self, timeout_ms=None):
"""Get a message from the queue, returning None if timeout expires
without a message. timeout is measured in milliseconds.
"""
logger.debug("Waiting for ResultsIncoming lock")
with self._lock:
logger.debug("Waiting for ResultsIncoming message")
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
socks = dict(self.poller.poll(timeout=timeout_ms))
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
else:
return None

def close(self):
with self._lock:
Expand Down
Loading

0 comments on commit 92114c6

Please sign in to comment.