diff --git a/README.rst b/README.rst index a8254e2e40..20988d2d86 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| +|licence| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |NumFOCUS| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -64,6 +64,9 @@ then explore the `parallel computing patterns Tuple[int, str, str]: - ''' Executes the cmd, with a defined walltime. - - Args: - - cmd (string): Command string to execute over the channel - - walltime (int) : Timeout in seconds - - Returns: - - (exit_code, stdout, stderr) (int, string, string) - ''' - pass - - @abstractproperty - def script_dir(self) -> str: - ''' This is a property. Returns the directory assigned for storing all internal scripts such as - scheduler submit scripts. This is usually where error logs from the scheduler would reside on the - channel destination side. - - Args: - - None - - Returns: - - Channel script dir - ''' - pass - - # DFK expects to be able to modify this, so it needs to be in the abstract class - @script_dir.setter - def script_dir(self, value: str) -> None: - pass diff --git a/parsl/channels/errors.py b/parsl/channels/errors.py deleted file mode 100644 index effaea9548..0000000000 --- a/parsl/channels/errors.py +++ /dev/null @@ -1,30 +0,0 @@ -''' Exceptions raise by Apps. -''' -from parsl.errors import ParslError - - -class ChannelError(ParslError): - """ Base class for all exceptions - - Only to be invoked when only a more specific error is not available. - """ - def __init__(self, reason: str, e: Exception, hostname: str) -> None: - self.reason = reason - self.e = e - self.hostname = hostname - - def __str__(self) -> str: - return "Hostname:{0}, Reason:{1}".format(self.hostname, self.reason) - - -class FileCopyException(ChannelError): - ''' File copy operation failed - - Contains: - reason(string) - e (paramiko exception object) - hostname (string) - ''' - - def __init__(self, e: Exception, hostname: str) -> None: - super().__init__("File copy failed due to {0}".format(e), e, hostname) diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py deleted file mode 100644 index 6ef014ac19..0000000000 --- a/parsl/channels/local/local.py +++ /dev/null @@ -1,66 +0,0 @@ -import logging -import os -import subprocess - -from parsl.channels.base import Channel -from parsl.utils import RepresentationMixin - -logger = logging.getLogger(__name__) - - -class LocalChannel(Channel, RepresentationMixin): - ''' This is not even really a channel, since opening a local shell is not heavy - and done so infrequently that they do not need a persistent channel - ''' - - def __init__(self): - ''' Initialize the local channel. script_dir is required by set to a default. - - KwArgs: - - script_dir (string): Directory to place scripts - ''' - self.script_dir = None - - def execute_wait(self, cmd, walltime=None): - ''' Synchronously execute a commandline string on the shell. - - Args: - - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds - - Returns: - - retcode : Return code from the execution - - stdout : stdout string - - stderr : stderr string - ''' - try: - logger.debug("Creating process with command '%s'", cmd) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - preexec_fn=os.setpgrp - ) - logger.debug("Created process with pid %s. Performing communicate", proc.pid) - (stdout, stderr) = proc.communicate(timeout=walltime) - retcode = proc.returncode - logger.debug("Process %s returned %s", proc.pid, proc.returncode) - - except Exception: - logger.exception(f"Execution of command failed:\n{cmd}") - raise - else: - logger.debug("Execution of command in process %s completed normally", proc.pid) - - return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - - @property - def script_dir(self): - return self._script_dir - - @script_dir.setter - def script_dir(self, value): - if value is not None: - value = os.path.abspath(value) - self._script_dir = value diff --git a/parsl/configs/ASPIRE1.py b/parsl/configs/ASPIRE1.py index 7792f15dba..017e1061d7 100644 --- a/parsl/configs/ASPIRE1.py +++ b/parsl/configs/ASPIRE1.py @@ -34,7 +34,6 @@ ], monitoring=MonitoringHub( hub_address=address_by_interface('ib0'), - hub_port=55055, resource_monitoring_interval=10, ), strategy='simple', diff --git a/parsl/configs/cc_in2p3.py b/parsl/configs/cc_in2p3.py index 631d76f9f5..f0140da55a 100644 --- a/parsl/configs/cc_in2p3.py +++ b/parsl/configs/cc_in2p3.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import GridEngineProvider @@ -10,7 +9,6 @@ label='cc_in2p3_htex', max_workers_per_node=2, provider=GridEngineProvider( - channel=LocalChannel(), nodes_per_block=1, init_blocks=2, max_blocks=2, diff --git a/parsl/configs/frontera.py b/parsl/configs/frontera.py index a7b6f27b6c..25682bbe86 100644 --- a/parsl/configs/frontera.py +++ b/parsl/configs/frontera.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -15,7 +14,6 @@ max_workers_per_node=1, # Set number of workers per node provider=SlurmProvider( cmd_timeout=60, # Add extra time for slow scheduler responses - channel=LocalChannel(), nodes_per_block=2, init_blocks=1, min_blocks=1, diff --git a/parsl/configs/htex_local.py b/parsl/configs/htex_local.py index 721dea767e..57549a4728 100644 --- a/parsl/configs/htex_local.py +++ b/parsl/configs/htex_local.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -10,7 +9,6 @@ label="htex_local", cores_per_worker=1, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 48a56755c9..900111a097 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1201,8 +1201,6 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') os.makedirs(executor.provider.script_dir, exist_ok=True) - executor.provider.channel.script_dir = executor.provider.script_dir - self.executors[executor.label] = executor executor.start() block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)] diff --git a/parsl/executors/base.py b/parsl/executors/base.py index a112b9eb00..fc97db89d3 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index f96ce94566..b9dc086024 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -331,6 +331,9 @@ def __init__(self, interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD self.interchange_launch_cmd = interchange_launch_cmd + self._result_queue_thread_exit = threading.Event() + self._result_queue_thread: Optional[threading.Thread] = None + radio_mode = "htex" enable_mpi_mode: bool = False mpi_launcher: str = "mpiexec" @@ -455,9 +458,11 @@ def _result_queue_worker(self): """ logger.debug("Result queue worker starting") - while not self.bad_state_is_set: + while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set(): try: - msgs = self.incoming_q.get() + msgs = self.incoming_q.get(timeout_ms=self.poll_period) + if msgs is None: # timeout + continue except IOError as e: logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e)) @@ -515,6 +520,8 @@ def _result_queue_worker(self): else: raise BadMessage("Message received with unknown type {}".format(msg['type'])) + logger.info("Closing result ZMQ pipe") + self.incoming_q.close() logger.info("Result queue worker finished") def _start_local_interchange_process(self) -> None: @@ -822,6 +829,8 @@ def shutdown(self, timeout: float = 10.0): logger.info("Attempting HighThroughputExecutor shutdown") + logger.info("Terminating interchange and result queue thread") + self._result_queue_thread_exit.set() self.interchange_proc.terminate() try: self.interchange_proc.wait(timeout=timeout) @@ -846,6 +855,10 @@ def shutdown(self, timeout: float = 10.0): logger.info("Closing command client") self.command_client.close() + logger.info("Waiting for result queue thread exit") + if self._result_queue_thread: + self._result_queue_thread.join() + logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 1e469f1f47..4e185a298c 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,7 +20,8 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.zmq import ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle diff --git a/parsl/executors/high_throughput/zmq_pipes.py b/parsl/executors/high_throughput/zmq_pipes.py index ab1630e961..e7b39035cd 100644 --- a/parsl/executors/high_throughput/zmq_pipes.py +++ b/parsl/executors/high_throughput/zmq_pipes.py @@ -225,15 +225,24 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address), min_port=port_range[0], max_port=port_range[1]) + self.poller = zmq.Poller() + self.poller.register(self.results_receiver, zmq.POLLIN) self._lock = threading.Lock() - def get(self): + def get(self, timeout_ms=None): + """Get a message from the queue, returning None if timeout expires + without a message. timeout is measured in milliseconds. + """ logger.debug("Waiting for ResultsIncoming lock") with self._lock: logger.debug("Waiting for ResultsIncoming message") - m = self.results_receiver.recv_multipart() - logger.debug("Received ResultsIncoming message") - return m + socks = dict(self.poller.poll(timeout=timeout_ms)) + if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN: + m = self.results_receiver.recv_multipart() + logger.debug("Received ResultsIncoming message") + return m + else: + return None def close(self): with self._lock: diff --git a/parsl/monitoring/davidadams_reporter.py b/parsl/monitoring/davidadams_reporter.py deleted file mode 100755 index f0ef30a4ec..0000000000 --- a/parsl/monitoring/davidadams_reporter.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python3 - -"""This script is for david adams to experiment with collecting -per-node data - ultimately to log to the parsl monitoring database, -but right now to look at which data fields to collect, and log them -to per-node CSV. - -This expects to receive parameters that align with the parsl work queue worker -such as the block ID. -""" - -import logging -import platform -import subprocess -import sys -import time -from typing import Any - -import psutil - -from parsl.log_utils import set_stream_logger - -logger = logging.getLogger("parsl.monitoring.davidadams_reporter") - - -if __name__ == "__main__": - - set_stream_logger() - logger.info(f"reporter starting, with args {sys.argv}") - - report_prefix = sys.argv[1] - - logger.info(f"will log to prefix {report_prefix}") - - args = sys.argv[2:] - logger.info(f"reporter launching workers with args {args}") - - set_stream_logger() - - hostname = platform.node() - csv_filename = report_prefix + "/" + hostname + "." + str(time.time()) + ".csv" - - worker_process = subprocess.Popen(args) - - ret: Any = None - - reading = 0 - with open(csv_filename, "w") as csv_file: - while ret is None: - ret = worker_process.poll() - logger.info("sleeping in poll loop") - print(f"{time.time()},{reading},{psutil.cpu_percent()}", file=csv_file, flush=True) - - reading += 1 - time.sleep(10) - - logger.info(f"subprocessed ended with return code {ret.returncode}") - - logger.info(f"node reporter ending, passing on return code {ret.returncode} from workers") - sys.exit(ret.returncode) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index db0e76fe15..8719b0a8c7 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -14,7 +14,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue diff --git a/parsl/monitoring/node_reporter.py b/parsl/monitoring/node_reporter.py deleted file mode 100755 index db525e0317..0000000000 --- a/parsl/monitoring/node_reporter.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python3 - -"""This script is for sending a node report to monitoring at the start of a -batch job, immediately before launching a worker process. - -This is not needed for the HighThroughputExecutor, which contains node -reporting code internally. - -This expects to receive parameters that align with the parsl work queue worker -such as the block ID. -""" - -import logging -import os -import platform -import subprocess -import sys -import uuid -from datetime import datetime - -import psutil - -from parsl.log_utils import set_stream_logger -from parsl.monitoring.radios import FilesystemRadioSender - -logger = logging.getLogger("parsl.monitoring.node_reporter") - - -def send_msg(*, active, uid, radio): - - now = datetime.today() - - msg = { - 'run_id': 'TODO', # from commandline or environment? - 'hostname': platform.node(), - 'uid': uid, - 'block_id': os.getenv('PARSL_WORKER_BLOCK_ID', "unknown"), - 'cpu_count': psutil.cpu_count(logical=False), - 'total_memory': psutil.virtual_memory().total, - 'active': active, - 'worker_count': 1, # from commandline - 'python_v': "{}.{}.{}".format(sys.version_info.major, - sys.version_info.minor, - sys.version_info.micro), - - # TODO: what's the difference between these two? timestamp is when the - # message is sent, and last_heartbeat is when we last heard from the - # worker? in the non-heartbeating case, that probably means set them both - # to now. - 'timestamp': now, - 'last_heartbeat': now - } - - logger.debug(f"created message {msg}") - - radio.send(msg) - - # TODO: send dict - - -if __name__ == "__main__": - set_stream_logger() - logger.info(f"node reporter starting, with args {sys.argv}") - - logger.info("initializing radio") - - # initial WQ-specific radio here - perhaps this should be parameterisable? - # depending on if this will be wq specific and if wq transmission method - # will be configurable. - - # monitoring_hub_url is unused - could perhaps become a "contact string" - # that can specify many things? (and the url scheme could then specify - # the radio protocol?) - - # radios now need to cope with source_id not being a task ID - None here, - # but could be something else (eg the uid?) - what is it used for anyway? - - # and run_dir needs to be passed in somehow. wq workers probably don't get - # this by default because they're expecting to live in a different - # filesystem domain. - - # maybe i should hack this in really badly for now. - - run_dir = "/home/benc/parsl/src/parsl/runinfo/000/" # TODO at least get the real version of this value, no matter how badly - - radio = FilesystemRadioSender(monitoring_url="", # TODO: monitoring_hub_url and source_id real values? - run_dir=run_dir) - - uid = str(uuid.uuid4()) - - logger.info(f"node reporter uid = {uid}") - - send_msg(active=True, uid=uid, radio=radio) - - args = sys.argv[1:] - logger.info(f"node reporter launching workqueue with args {args}") - - ret = subprocess.run(args) - logger.info(f"subprocessed ended with return code {ret.returncode}") - - send_msg(active=False, uid=uid, radio=radio) - - logger.info(f"node reporter ending, passing on return code {ret.returncode}") - sys.exit(ret.returncode) diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py deleted file mode 100644 index 14dc046557..0000000000 --- a/parsl/monitoring/radios.py +++ /dev/null @@ -1,191 +0,0 @@ -import logging -import os -import pickle -import socket -import uuid -from abc import ABCMeta, abstractmethod -from multiprocessing.queues import Queue - -import zmq - -logger = logging.getLogger(__name__) - - -class MonitoringRadioSender(metaclass=ABCMeta): - @abstractmethod - def send(self, message: object) -> None: - pass - - -class FilesystemRadioSender(MonitoringRadioSender): - """A MonitoringRadioSender that sends messages over a shared filesystem. - - The messsage directory structure is based on maildir, - https://en.wikipedia.org/wiki/Maildir - - The writer creates a message in tmp/ and then when it is fully - written, moves it atomically into new/ - - The reader ignores tmp/ and only reads and deletes messages from - new/ - - This avoids a race condition of reading partially written messages. - - This radio is likely to give higher shared filesystem load compared to - the UDP radio, but should be much more reliable. - """ - - def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): - logger.info("filesystem based monitoring channel initializing") - self.base_path = f"{run_dir}/monitor-fs-radio/" - self.tmp_path = f"{self.base_path}/tmp" - self.new_path = f"{self.base_path}/new" - - os.makedirs(self.tmp_path, exist_ok=True) - os.makedirs(self.new_path, exist_ok=True) - - def send(self, message: object) -> None: - logger.info("Sending a monitoring message via filesystem") - - unique_id = str(uuid.uuid4()) - - tmp_filename = f"{self.tmp_path}/{unique_id}" - new_filename = f"{self.new_path}/{unique_id}" - buffer = message - - # this will write the message out then atomically - # move it into new/, so that a partially written - # file will never be observed in new/ - with open(tmp_filename, "wb") as f: - pickle.dump(buffer, f) - os.rename(tmp_filename, new_filename) - - -class HTEXRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - logger.info("htex-based monitoring channel initialising") - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - - import parsl.executors.high_throughput.monitoring_info - - result_queue = parsl.executors.high_throughput.monitoring_info.result_queue - - # this message needs to go in the result queue tagged so that it is treated - # i) as a monitoring message by the interchange, and then further more treated - # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO - # which is the implicit default for messages from the interchange) - - # for the interchange, the outer wrapper, this needs to be a dict: - - interchange_msg = { - 'type': 'monitoring', - 'payload': message - } - - if result_queue: - result_queue.put(pickle.dumps(interchange_msg)) - else: - logger.error("result_queue is uninitialized - cannot put monitoring message") - - return - - -class UDPRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - self.monitoring_url = monitoring_url - self.sock_timeout = timeout - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) - - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) # UDP - self.sock.settimeout(self.sock_timeout) - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - try: - buffer = pickle.dumps(message) - except Exception: - logging.exception("Exception during pickling", exc_info=True) - return - - try: - self.sock.sendto(buffer, (self.ip, self.port)) - except socket.timeout: - logging.error("Could not send message within timeout limit") - return - return - - -class MultiprocessingQueueRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over a multiprocessing Queue. - This radio is intended to be used on the submit side, where components - in the submit process, or processes launched by multiprocessing, will have - access to a Queue shared with the monitoring database code (bypassing the - monitoring router). - """ - def __init__(self, queue: Queue) -> None: - self.queue = queue - - def send(self, message: object) -> None: - self.queue.put(message) - - -class ZMQRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over ZMQ. This radio is not - thread-safe, because its use of ZMQ is not thread-safe. - """ - - def __init__(self, hub_address: str, hub_zmq_port: int) -> None: - self._hub_channel = zmq.Context().socket(zmq.DEALER) - self._hub_channel.set_hwm(0) - self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") - - def send(self, message: object) -> None: - self._hub_channel.send_pyobj(message) diff --git a/parsl/channels/local/__init__.py b/parsl/monitoring/radios/__init__.py similarity index 100% rename from parsl/channels/local/__init__.py rename to parsl/monitoring/radios/__init__.py diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py new file mode 100644 index 0000000000..2bb799f256 --- /dev/null +++ b/parsl/monitoring/radios/base.py @@ -0,0 +1,13 @@ +import logging +from abc import ABCMeta, abstractmethod +from typing import Optional + +_db_manager_excepts: Optional[Exception] + +logger = logging.getLogger(__name__) + + +class MonitoringRadioSender(metaclass=ABCMeta): + @abstractmethod + def send(self, message: object) -> None: + pass diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py new file mode 100644 index 0000000000..accff87d36 --- /dev/null +++ b/parsl/monitoring/radios/filesystem.py @@ -0,0 +1,52 @@ +import logging +import os +import pickle +import uuid + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. + + The messsage directory structure is based on maildir, + https://en.wikipedia.org/wiki/Maildir + + The writer creates a message in tmp/ and then when it is fully + written, moves it atomically into new/ + + The reader ignores tmp/ and only reads and deletes messages from + new/ + + This avoids a race condition of reading partially written messages. + + This radio is likely to give higher shared filesystem load compared to + the UDP radio, but should be much more reliable. + """ + + def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): + logger.info("filesystem based monitoring channel initializing") + self.base_path = f"{run_dir}/monitor-fs-radio/" + self.tmp_path = f"{self.base_path}/tmp" + self.new_path = f"{self.base_path}/new" + + os.makedirs(self.tmp_path, exist_ok=True) + os.makedirs(self.new_path, exist_ok=True) + + def send(self, message: object) -> None: + logger.info("Sending a monitoring message via filesystem") + + unique_id = str(uuid.uuid4()) + + tmp_filename = f"{self.tmp_path}/{unique_id}" + new_filename = f"{self.new_path}/{unique_id}" + buffer = message + + # this will write the message out then atomically + # move it into new/, so that a partially written + # file will never be observed in new/ + with open(tmp_filename, "wb") as f: + pickle.dump(buffer, f) + os.rename(tmp_filename, new_filename) diff --git a/parsl/monitoring/radios/htex.py b/parsl/monitoring/radios/htex.py new file mode 100644 index 0000000000..bdb893b303 --- /dev/null +++ b/parsl/monitoring/radios/htex.py @@ -0,0 +1,57 @@ +import logging +import pickle + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class HTEXRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + logger.info("htex-based monitoring channel initialising") + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + + import parsl.executors.high_throughput.monitoring_info + + result_queue = parsl.executors.high_throughput.monitoring_info.result_queue + + # this message needs to go in the result queue tagged so that it is treated + # i) as a monitoring message by the interchange, and then further more treated + # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO + # which is the implicit default for messages from the interchange) + + # for the interchange, the outer wrapper, this needs to be a dict: + + interchange_msg = { + 'type': 'monitoring', + 'payload': message + } + + if result_queue: + result_queue.put(pickle.dumps(interchange_msg)) + else: + logger.error("result_queue is uninitialized - cannot put monitoring message") + + return diff --git a/parsl/monitoring/radios/multiprocessing.py b/parsl/monitoring/radios/multiprocessing.py new file mode 100644 index 0000000000..6274bbfca8 --- /dev/null +++ b/parsl/monitoring/radios/multiprocessing.py @@ -0,0 +1,17 @@ +from multiprocessing.queues import Queue + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class MultiprocessingQueueRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put(message) diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py new file mode 100644 index 0000000000..f2a652e9ac --- /dev/null +++ b/parsl/monitoring/radios/udp.py @@ -0,0 +1,56 @@ +import logging +import pickle +import socket + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class UDPRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + self.monitoring_url = monitoring_url + self.sock_timeout = timeout + try: + self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) + self.port = int(port) + except Exception: + raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + + self.sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) # UDP + self.sock.settimeout(self.sock_timeout) + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + try: + buffer = pickle.dumps(message) + except Exception: + logging.exception("Exception during pickling", exc_info=True) + return + + try: + self.sock.sendto(buffer, (self.ip, self.port)) + except socket.timeout: + logging.error("Could not send message within timeout limit") + return + return diff --git a/parsl/monitoring/radios/zmq.py b/parsl/monitoring/radios/zmq.py new file mode 100644 index 0000000000..397c943568 --- /dev/null +++ b/parsl/monitoring/radios/zmq.py @@ -0,0 +1,17 @@ +import zmq + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d72b54dc3c..530b39f935 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,12 +7,10 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import ( - FilesystemRadioSender, - HTEXRadioSender, - MonitoringRadioSender, - UDPRadioSender, -) +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.filesystem import FilesystemRadioSender +from parsl.monitoring.radios.htex import HTEXRadioSender +from parsl.monitoring.radios.udp import UDPRadioSender from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 04e7480a7a..0926712c36 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,7 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle diff --git a/parsl/providers/cluster_provider.py b/parsl/providers/cluster_provider.py index 6bc76bdf22..db7ef9eaac 100644 --- a/parsl/providers/cluster_provider.py +++ b/parsl/providers/cluster_provider.py @@ -6,6 +6,7 @@ from parsl.launchers.errors import BadLauncher from parsl.providers.base import ExecutionProvider from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError +from parsl.utils import execute_wait logger = logging.getLogger(__name__) @@ -17,8 +18,6 @@ class ClusterProvider(ExecutionProvider): ---------- label : str Label for this provider. - channel : Channel - Channel for accessing this provider. walltime : str Walltime requested per block in HH:MM:SS. launcher : Launcher @@ -44,7 +43,6 @@ class ClusterProvider(ExecutionProvider): def __init__(self, label, - channel, nodes_per_block, init_blocks, min_blocks, @@ -55,7 +53,6 @@ def __init__(self, cmd_timeout=10): self._label = label - self.channel = channel self.nodes_per_block = nodes_per_block self.init_blocks = init_blocks self.min_blocks = min_blocks @@ -76,7 +73,7 @@ def execute_wait(self, cmd, timeout=None): t = self.cmd_timeout if timeout is not None: t = timeout - return self.channel.execute_wait(cmd, t) + return execute_wait(cmd, t) def _write_submit_script(self, template, script_filename, job_name, configs): """Generate submit script and write it to a file. diff --git a/parsl/providers/condor/condor.py b/parsl/providers/condor/condor.py index c8142c4026..150c72dfe7 100644 --- a/parsl/providers/condor/condor.py +++ b/parsl/providers/condor/condor.py @@ -5,7 +5,6 @@ import typeguard -from parsl.channels import LocalChannel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.launchers.base import Launcher @@ -18,8 +17,6 @@ from typing import Dict, List, Optional -from parsl.channels.base import Channel - # See http://pages.cs.wisc.edu/~adesmet/status.html translate_table = { '1': JobState.PENDING, @@ -36,8 +33,6 @@ class CondorProvider(RepresentationMixin, ClusterProvider): Parameters ---------- - channel : Channel - Channel for accessing this provider. nodes_per_block : int Nodes to provision per block. cores_per_slot : int @@ -79,7 +74,6 @@ class CondorProvider(RepresentationMixin, ClusterProvider): """ @typeguard.typechecked def __init__(self, - channel: Channel = LocalChannel(), nodes_per_block: int = 1, cores_per_slot: Optional[int] = None, mem_per_slot: Optional[float] = None, @@ -100,7 +94,6 @@ def __init__(self, label = 'condor' super().__init__(label, - channel, nodes_per_block, init_blocks, min_blocks, @@ -226,7 +219,7 @@ def submit(self, command, tasks_per_node, job_name="parsl.condor"): job_config = {} job_config["job_name"] = job_name - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["project"] = self.project job_config["nodes"] = self.nodes_per_block job_config["scheduler_options"] = scheduler_options diff --git a/parsl/providers/grid_engine/grid_engine.py b/parsl/providers/grid_engine/grid_engine.py index ddedcaa3e8..b01e86cd5d 100644 --- a/parsl/providers/grid_engine/grid_engine.py +++ b/parsl/providers/grid_engine/grid_engine.py @@ -2,7 +2,6 @@ import os import time -from parsl.channels import LocalChannel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.providers.cluster_provider import ClusterProvider @@ -36,8 +35,6 @@ class GridEngineProvider(ClusterProvider, RepresentationMixin): Parameters ---------- - channel : Channel - Channel for accessing this provider. nodes_per_block : int Nodes to provision per block. min_blocks : int @@ -62,7 +59,6 @@ class GridEngineProvider(ClusterProvider, RepresentationMixin): """ def __init__(self, - channel=LocalChannel(), nodes_per_block=1, init_blocks=1, min_blocks=0, @@ -76,7 +72,6 @@ def __init__(self, queue=None): label = 'grid_engine' super().__init__(label, - channel, nodes_per_block, init_blocks, min_blocks, @@ -100,7 +95,7 @@ def get_configs(self, command, tasks_per_node): self.nodes_per_block, tasks_per_node)) job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["walltime"] = self.walltime job_config["scheduler_options"] = self.scheduler_options diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index 5ecf174df2..55994c31c3 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -2,7 +2,6 @@ import os import time -from parsl.channels import LocalChannel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.providers.base import ExecutionProvider @@ -11,7 +10,7 @@ ScriptPathError, SubmitException, ) -from parsl.utils import RepresentationMixin +from parsl.utils import RepresentationMixin, execute_wait logger = logging.getLogger(__name__) @@ -37,7 +36,6 @@ class LocalProvider(ExecutionProvider, RepresentationMixin): """ def __init__(self, - channel=LocalChannel(), nodes_per_block=1, launcher=SingleNodeLauncher(), init_blocks=1, @@ -46,7 +44,6 @@ def __init__(self, worker_init='', cmd_timeout=30, parallelism=1): - self.channel = channel self._label = 'local' self.nodes_per_block = nodes_per_block self.launcher = launcher @@ -118,7 +115,7 @@ def status(self, job_ids): return [self.resources[jid]['status'] for jid in job_ids] def _is_alive(self, job_dict): - retcode, stdout, stderr = self.channel.execute_wait( + retcode, stdout, stderr = execute_wait( 'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format( job_dict['remote_pid']), self.cmd_timeout) for line in stdout.split('\n'): @@ -223,11 +220,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): # cancel the task later. # # We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise - # channel.execute_wait hangs reading the process stdout until all the + # execute_wait hangs reading the process stdout until all the # background commands complete. cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \ 'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode), stdout, stderr) @@ -258,7 +255,7 @@ def cancel(self, job_ids): job_dict['cancelled'] = True logger.debug("Terminating job/process ID: {0}".format(job)) cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid']) - retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout) + retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout) if retcode != 0: logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'], self.label)) diff --git a/parsl/providers/lsf/lsf.py b/parsl/providers/lsf/lsf.py index b446b063a4..f2d3f88652 100644 --- a/parsl/providers/lsf/lsf.py +++ b/parsl/providers/lsf/lsf.py @@ -3,7 +3,6 @@ import os import time -from parsl.channels import LocalChannel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.providers.cluster_provider import ClusterProvider @@ -32,8 +31,6 @@ class LSFProvider(ClusterProvider, RepresentationMixin): Parameters ---------- - channel : Channel - Channel for accessing this provider. nodes_per_block : int Nodes to provision per block. When request_by_nodes is False, it is computed by cores_per_block / cores_per_node. @@ -77,7 +74,6 @@ class LSFProvider(ClusterProvider, RepresentationMixin): """ def __init__(self, - channel=LocalChannel(), nodes_per_block=1, cores_per_block=None, cores_per_node=None, @@ -96,7 +92,6 @@ def __init__(self, launcher=SingleNodeLauncher()): label = 'LSF' super().__init__(label, - channel, nodes_per_block, init_blocks, min_blocks, @@ -211,7 +206,7 @@ def submit(self, command, tasks_per_node, job_name="parsl.lsf"): logger.debug("Requesting one block with {} nodes".format(self.nodes_per_block)) job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["tasks_per_node"] = tasks_per_node job_config["walltime"] = wtime_to_minutes(self.walltime) diff --git a/parsl/providers/pbspro/pbspro.py b/parsl/providers/pbspro/pbspro.py index 71c958f000..aa2e5e2f54 100644 --- a/parsl/providers/pbspro/pbspro.py +++ b/parsl/providers/pbspro/pbspro.py @@ -3,7 +3,6 @@ import os import time -from parsl.channels import LocalChannel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.providers.pbspro.template import template_string @@ -17,8 +16,6 @@ class PBSProProvider(TorqueProvider): Parameters ---------- - channel : Channel - Channel for accessing this provider. account : str Account the job will be charged against. queue : str @@ -51,7 +48,6 @@ class PBSProProvider(TorqueProvider): :class:`~parsl.launchers.SingleNodeLauncher`. """ def __init__(self, - channel=LocalChannel(), account=None, queue=None, scheduler_options='', @@ -66,8 +62,7 @@ def __init__(self, launcher=SingleNodeLauncher(), walltime="00:20:00", cmd_timeout=120): - super().__init__(channel, - account, + super().__init__(account, queue, scheduler_options, worker_init, @@ -159,7 +154,7 @@ def submit(self, command, tasks_per_node, job_name="parsl"): ) job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes_per_block"] = self.nodes_per_block job_config["ncpus"] = self.cpus_per_node job_config["walltime"] = self.walltime diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 9b6f38b9d9..cf50058522 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -3,12 +3,10 @@ import os import re import time -from typing import Optional +from typing import Any, Dict, Optional import typeguard -from parsl.channels import LocalChannel -from parsl.channels.base import Channel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.launchers.base import Launcher @@ -73,8 +71,6 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): clusters : str Slurm cluster name, or comma seperated cluster list, used to choose between different clusters in a federated Slurm instance. If unspecified or ``None``, no slurm directive for clusters will be added. - channel : Channel - Channel for accessing this provider. nodes_per_block : int Nodes to provision per block. cores_per_node : int @@ -119,7 +115,6 @@ def __init__(self, qos: Optional[str] = None, constraint: Optional[str] = None, clusters: Optional[str] = None, - channel: Channel = LocalChannel(), nodes_per_block: int = 1, cores_per_node: Optional[int] = None, mem_per_node: Optional[int] = None, @@ -136,7 +131,6 @@ def __init__(self, launcher: Launcher = SingleNodeLauncher()): label = 'slurm' super().__init__(label, - channel, nodes_per_block, init_blocks, min_blocks, @@ -286,8 +280,8 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s logger.debug("Requesting one block with {} nodes".format(self.nodes_per_block)) - job_config = {} - job_config["submit_script_dir"] = self.channel.script_dir + job_config: Dict[str, Any] = {} + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["tasks_per_node"] = tasks_per_node job_config["walltime"] = wtime_to_minutes(self.walltime) diff --git a/parsl/providers/torque/torque.py b/parsl/providers/torque/torque.py index 7992893abb..6958c8348a 100644 --- a/parsl/providers/torque/torque.py +++ b/parsl/providers/torque/torque.py @@ -2,7 +2,6 @@ import os import time -from parsl.channels import LocalChannel from parsl.jobs.states import JobState, JobStatus from parsl.launchers import AprunLauncher from parsl.providers.cluster_provider import ClusterProvider @@ -33,8 +32,6 @@ class TorqueProvider(ClusterProvider, RepresentationMixin): Parameters ---------- - channel : Channel - Channel for accessing this provider. account : str Account the job will be charged against. queue : str @@ -65,7 +62,6 @@ class TorqueProvider(ClusterProvider, RepresentationMixin): """ def __init__(self, - channel=LocalChannel(), account=None, queue=None, scheduler_options='', @@ -80,7 +76,6 @@ def __init__(self, cmd_timeout=120): label = 'torque' super().__init__(label, - channel, nodes_per_block, init_blocks, min_blocks, @@ -170,8 +165,7 @@ def submit(self, command, tasks_per_node, job_name="parsl.torque"): tasks_per_node) job_config = {} - # TODO : script_path might need to change to accommodate script dir set via channels - job_config["submit_script_dir"] = self.channel.script_dir + job_config["submit_script_dir"] = self.script_dir job_config["nodes"] = self.nodes_per_block job_config["task_blocks"] = self.nodes_per_block * tasks_per_node job_config["nodes_per_block"] = self.nodes_per_block diff --git a/parsl/tests/configs/cc_in2p3.py b/parsl/tests/configs/cc_in2p3.py index 38c817cccd..5d9edf9d0c 100644 --- a/parsl/tests/configs/cc_in2p3.py +++ b/parsl/tests/configs/cc_in2p3.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import GridEngineProvider @@ -14,7 +13,6 @@ def fresh_config(): max_workers_per_node=1, encrypted=True, provider=GridEngineProvider( - channel=LocalChannel(), nodes_per_block=2, init_blocks=2, max_blocks=2, diff --git a/parsl/tests/configs/frontera.py b/parsl/tests/configs/frontera.py index 537d6f78e0..41e2985e41 100644 --- a/parsl/tests/configs/frontera.py +++ b/parsl/tests/configs/frontera.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -20,7 +19,6 @@ def fresh_config(): encrypted=True, provider=SlurmProvider( cmd_timeout=60, # Add extra time for slow scheduler responses - channel=LocalChannel(), nodes_per_block=2, init_blocks=1, min_blocks=1, diff --git a/parsl/tests/configs/htex_local.py b/parsl/tests/configs/htex_local.py index 3dae57b758..712570b628 100644 --- a/parsl/tests/configs/htex_local.py +++ b/parsl/tests/configs/htex_local.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SimpleLauncher @@ -15,7 +14,6 @@ def fresh_config(): cores_per_worker=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, launcher=SimpleLauncher(), diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index b68c530c62..a763843c9d 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -17,14 +17,12 @@ import os -from parsl.channels import LocalChannel from parsl.config import Config from parsl.data_provider.file_noop import NoOpFileStaging from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.http import HTTPInTaskStaging from parsl.data_provider.zip import ZipFileStaging from parsl.executors import HighThroughputExecutor -from parsl.executors.high_throughput.executor import DEFAULT_LAUNCH_CMD from parsl.launchers import SingleNodeLauncher # imports for monitoring: @@ -39,7 +37,6 @@ def fresh_config(): executors=[ HighThroughputExecutor( address="127.0.0.1", - launch_cmd="davidadams_reporter.py /tmp/ " + DEFAULT_LAUNCH_CMD, label="htex_Local", working_dir=working_dir, storage_access=[ZipFileStaging(), FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], @@ -50,7 +47,6 @@ def fresh_config(): poll_period=100, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=0, min_blocks=0, max_blocks=5, @@ -60,11 +56,11 @@ def fresh_config(): ) ], strategy='simple', + strategy_period=0.5, app_cache=True, checkpoint_mode='task_exit', retries=2, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=1, ), diff --git a/parsl/tests/configs/htex_local_intask_staging.py b/parsl/tests/configs/htex_local_intask_staging.py index eef88ea2e1..69649b2bcb 100644 --- a/parsl/tests/configs/htex_local_intask_staging.py +++ b/parsl/tests/configs/htex_local_intask_staging.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.data_provider.file_noop import NoOpFileStaging from parsl.data_provider.ftp import FTPInTaskStaging @@ -15,7 +14,6 @@ cores_per_worker=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, launcher=SimpleLauncher(), diff --git a/parsl/tests/configs/htex_local_rsync_staging.py b/parsl/tests/configs/htex_local_rsync_staging.py index 044d2b9f89..d24b1da66b 100644 --- a/parsl/tests/configs/htex_local_rsync_staging.py +++ b/parsl/tests/configs/htex_local_rsync_staging.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.http import HTTPInTaskStaging @@ -16,7 +15,6 @@ working_dir="./rsync-workdir/", encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, launcher=SimpleLauncher(), diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 4edc329095..cf4281a384 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -18,7 +18,6 @@ def fresh_config(): return Config(executors=[executor], monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, resource_monitoring_interval=3, ) ) diff --git a/parsl/tests/configs/slurm_local.py b/parsl/tests/configs/slurm_local.py index 2a63f68e51..c0281cf340 100644 --- a/parsl/tests/configs/slurm_local.py +++ b/parsl/tests/configs/slurm_local.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -13,7 +12,6 @@ def fresh_config(): encrypted=True, provider=SlurmProvider( cmd_timeout=60, # Add extra time for slow scheduler responses - channel=LocalChannel(), nodes_per_block=1, init_blocks=1, min_blocks=1, diff --git a/parsl/tests/configs/workqueue_ex.py b/parsl/tests/configs/workqueue_ex.py index a53c5ea769..5df64d6a5c 100644 --- a/parsl/tests/configs/workqueue_ex.py +++ b/parsl/tests/configs/workqueue_ex.py @@ -4,14 +4,12 @@ from parsl.data_provider.http import HTTPInTaskStaging from parsl.executors import WorkQueueExecutor from parsl.monitoring import MonitoringHub -from parsl.providers import LocalProvider def fresh_config(): return Config(executors=[WorkQueueExecutor(port=9000, - coprocess=True, - provider=LocalProvider(init_blocks=0, min_blocks=0, max_blocks=1) - )], + coprocess=True)], + strategy_period=0.5, monitoring=MonitoringHub(hub_address="localhost", hub_port=55055, monitoring_debug=True, diff --git a/parsl/tests/integration/test_channels/__init__.py b/parsl/tests/integration/test_channels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/integration/test_channels/remote_run.sh b/parsl/tests/integration/test_channels/remote_run.sh deleted file mode 100644 index aa4945d859..0000000000 --- a/parsl/tests/integration/test_channels/remote_run.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -echo "Hostname: $HOSTNAME" -echo "Cpu info -----" -cat /proc/cpuinfo -echo "Done----------" diff --git a/parsl/tests/manual_tests/htex_local.py b/parsl/tests/manual_tests/htex_local.py index bcfdbf34ec..e85da878b1 100644 --- a/parsl/tests/manual_tests/htex_local.py +++ b/parsl/tests/manual_tests/htex_local.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor @@ -15,7 +14,6 @@ cores_per_worker=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, # tasks_per_node=1, # For HighThroughputExecutor, this option should in most cases be 1 diff --git a/parsl/tests/manual_tests/test_memory_limits.py b/parsl/tests/manual_tests/test_memory_limits.py index b353ba0929..f98a779e82 100644 --- a/parsl/tests/manual_tests/test_memory_limits.py +++ b/parsl/tests/manual_tests/test_memory_limits.py @@ -5,7 +5,6 @@ import parsl from parsl.app.app import python_app # , bash_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SingleNodeLauncher @@ -30,7 +29,6 @@ def test_simple(mem_per_worker): suppress_failure=True, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, launcher=SingleNodeLauncher(), diff --git a/parsl/tests/manual_tests/test_udp_simple.py b/parsl/tests/manual_tests/test_udp_simple.py index 8de257d8fa..847f8803bf 100644 --- a/parsl/tests/manual_tests/test_udp_simple.py +++ b/parsl/tests/manual_tests/test_udp_simple.py @@ -15,7 +15,6 @@ def local_setup(): ], monitoring=MonitoringHub( hub_address="127.0.0.1", - hub_port=55055, logging_level=logging.INFO, resource_monitoring_interval=10)) diff --git a/parsl/tests/scaling_tests/htex_local.py b/parsl/tests/scaling_tests/htex_local.py index f2bcf86963..f16f46af23 100644 --- a/parsl/tests/scaling_tests/htex_local.py +++ b/parsl/tests/scaling_tests/htex_local.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -12,7 +11,6 @@ max_workers_per_node=8, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), diff --git a/parsl/tests/sites/test_affinity.py b/parsl/tests/sites/test_affinity.py index 50d08ce830..792d490e03 100644 --- a/parsl/tests/sites/test_affinity.py +++ b/parsl/tests/sites/test_affinity.py @@ -5,7 +5,6 @@ import pytest from parsl import python_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -22,7 +21,6 @@ def local_config(): available_accelerators=2, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), diff --git a/parsl/tests/sites/test_worker_info.py b/parsl/tests/sites/test_worker_info.py index 4d9865da84..e55064a507 100644 --- a/parsl/tests/sites/test_worker_info.py +++ b/parsl/tests/sites/test_worker_info.py @@ -3,7 +3,6 @@ import pytest from parsl import python_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -18,7 +17,6 @@ def local_config(): max_workers_per_node=4, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), diff --git a/parsl/tests/test_channels/__init__.py b/parsl/tests/test_channels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_channels/test_large_output.py deleted file mode 100644 index bfc96f38bc..0000000000 --- a/parsl/tests/test_channels/test_large_output.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_local_large_output_2210(): - """Regression test for #2210. - The local channel was hanging if the specified command gave too - much output, due to a race condition between process exiting and - pipes filling up. - """ - - c = LocalChannel() - - # this will output 128kb of stdout - c.execute_wait("yes | dd count=128 bs=1024", walltime=60) - - # if this test fails, execute_wait should raise a timeout - # exception. - - # The contents out the output is not verified by this test diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py deleted file mode 100644 index a3f55d096d..0000000000 --- a/parsl/tests/test_channels/test_local_channel.py +++ /dev/null @@ -1,19 +0,0 @@ -import pytest - -from parsl.channels.local.local import LocalChannel - - -@pytest.mark.local -def test_env(): - ''' Regression testing for issue #27 - ''' - - lc = LocalChannel() - rc, stdout, stderr = lc.execute_wait("env", 1) - - stdout = stdout.split('\n') - x = [s for s in stdout if s.startswith("PATH=")] - assert x, "PATH not found" - - x = [s for s in stdout if s.startswith("HOME=")] - assert x, "HOME not found" diff --git a/parsl/tests/test_htex/test_drain.py b/parsl/tests/test_htex/test_drain.py index efd0405863..663978f4c8 100644 --- a/parsl/tests/test_htex/test_drain.py +++ b/parsl/tests/test_htex/test_drain.py @@ -3,7 +3,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SimpleLauncher @@ -28,7 +27,6 @@ def local_config(): cores_per_worker=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, min_blocks=0, max_blocks=0, diff --git a/parsl/tests/test_htex/test_manager_selector_by_block.py b/parsl/tests/test_htex/test_manager_selector_by_block.py index 0933b581ff..1b2a4ee1f7 100644 --- a/parsl/tests/test_htex/test_manager_selector_by_block.py +++ b/parsl/tests/test_htex/test_manager_selector_by_block.py @@ -4,7 +4,6 @@ import parsl from parsl.app.app import bash_app, python_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.executors.high_throughput.manager_selector import ( @@ -31,7 +30,6 @@ def test_block_id_selection(try_assert): max_workers_per_node=1, manager_selector=BlockIdManagerSelector(), provider=LocalProvider( - channel=LocalChannel(), init_blocks=BLOCK_COUNT, max_blocks=BLOCK_COUNT, min_blocks=BLOCK_COUNT, diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index 9ffa21df01..507d569f7b 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -12,11 +12,9 @@ @parsl.python_app def this_app(): - # this delay needs to be several times the resource monitoring - # period configured in the test configuration, so that some - # messages are actually sent - there is no guarantee that any - # (non-first) resource message will be sent at all for a short app. - time.sleep(3) + # TODO: deleted this sleep because we will always send a final resource message + # rather than requiring polling to happen - since TODO PR ##### + # time.sleep(3) return 5 @@ -66,6 +64,7 @@ def workqueue_config(): def taskvine_config(): c = Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), worker_launch_method='provider')], + strategy_period=0.5, monitoring=MonitoringHub(hub_address="localhost", resource_monitoring_interval=1)) @@ -88,8 +87,12 @@ def test_row_counts(tmpd_cwd, fresh_config): config.run_dir = tmpd_cwd config.monitoring.logging_endpoint = db_url + print(f"load {time.time()}") with parsl.load(config): + print(f"start {time.time()}") assert this_app().result() == 5 + print(f"end {time.time()}") + print(f"unload {time.time()}") # at this point, we should find one row in the monitoring database. diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index ada972e747..67db42671c 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -5,7 +5,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SimpleLauncher @@ -22,7 +21,6 @@ def fresh_config(run_dir, strategy, db_url): cores_per_worker=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, # min and max are set to 0 to ensure that we don't get # a block from ongoing strategy scaling, only from @@ -37,7 +35,6 @@ def fresh_config(run_dir, strategy, db_url): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, logging_endpoint=db_url ) ) diff --git a/parsl/tests/test_monitoring/test_stdouterr.py b/parsl/tests/test_monitoring/test_stdouterr.py index d1817164c0..8e1935045f 100644 --- a/parsl/tests/test_monitoring/test_stdouterr.py +++ b/parsl/tests/test_monitoring/test_stdouterr.py @@ -37,7 +37,6 @@ def fresh_config(run_dir): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, ) ) diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index 8d56d89ddf..c750814463 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -11,7 +11,6 @@ import pytest -from parsl.channels import LocalChannel from parsl.jobs.states import JobState from parsl.launchers import SingleNodeLauncher from parsl.providers import LocalProvider @@ -63,7 +62,7 @@ def _run_tests(p: LocalProvider): def test_local_channel(): with tempfile.TemporaryDirectory() as script_dir: script_dir = tempfile.mkdtemp() - p = LocalProvider(channel=LocalChannel(), launcher=SingleNodeLauncher(debug=False)) + p = LocalProvider(launcher=SingleNodeLauncher(debug=False)) p.script_dir = script_dir _run_tests(p) diff --git a/parsl/tests/test_providers/test_pbspro_template.py b/parsl/tests/test_providers/test_pbspro_template.py index dec987ccbb..1264731120 100644 --- a/parsl/tests/test_providers/test_pbspro_template.py +++ b/parsl/tests/test_providers/test_pbspro_template.py @@ -3,7 +3,6 @@ import pytest -from parsl.channels import LocalChannel from parsl.providers import PBSProProvider @@ -12,10 +11,9 @@ def test_submit_script_basic(tmp_path): """Test slurm resources table""" provider = PBSProProvider( - queue="debug", channel=LocalChannel() + queue="debug" ) provider.script_dir = tmp_path - provider.channel.script_dir = tmp_path job_id = str(random.randint(55000, 59000)) provider.execute_wait = mock.Mock(spec=PBSProProvider.execute_wait) provider.execute_wait.return_value = (0, job_id, "") diff --git a/parsl/tests/test_providers/test_slurm_template.py b/parsl/tests/test_providers/test_slurm_template.py index 57fd8e4d0b..55074fefe3 100644 --- a/parsl/tests/test_providers/test_slurm_template.py +++ b/parsl/tests/test_providers/test_slurm_template.py @@ -4,7 +4,6 @@ import pytest -from parsl.channels import LocalChannel from parsl.providers import SlurmProvider @@ -13,10 +12,9 @@ def test_submit_script_basic(tmp_path): """Test slurm resources table""" provider = SlurmProvider( - partition="debug", channel=LocalChannel() + partition="debug" ) provider.script_dir = tmp_path - provider.channel.script_dir = tmp_path job_id = str(random.randint(55000, 59000)) provider.execute_wait = mock.MagicMock(spec=SlurmProvider.execute_wait) provider.execute_wait.return_value = (0, f"Submitted batch job {job_id}", "") diff --git a/parsl/tests/test_scaling/test_regression_1621.py b/parsl/tests/test_scaling/test_regression_1621.py index 5aec750068..ea7d8f2631 100644 --- a/parsl/tests/test_scaling/test_regression_1621.py +++ b/parsl/tests/test_scaling/test_regression_1621.py @@ -3,7 +3,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SimpleLauncher @@ -43,7 +42,6 @@ def test_one_block(tmpd_cwd): one app is invoked. this is a regression test. """ oneshot_provider = OneShotLocalProvider( - channel=LocalChannel(), init_blocks=0, min_blocks=0, max_blocks=10, diff --git a/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py index a56b53af10..0c4a474b19 100644 --- a/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py +++ b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py @@ -3,7 +3,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import WrappedLauncher diff --git a/parsl/tests/test_scaling/test_scale_down.py b/parsl/tests/test_scaling/test_scale_down.py index 7fb72ba507..e50667dd78 100644 --- a/parsl/tests/test_scaling/test_scale_down.py +++ b/parsl/tests/test_scaling/test_scale_down.py @@ -5,7 +5,6 @@ import parsl from parsl import File, python_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SingleNodeLauncher @@ -29,7 +28,6 @@ def local_config(): max_workers_per_node=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=0, max_blocks=_max_blocks, min_blocks=_min_blocks, @@ -39,6 +37,7 @@ def local_config(): ], max_idletime=0.5, strategy='simple', + strategy_period=0.5, ) diff --git a/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py b/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py index 016a51dc48..831bdf82af 100644 --- a/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py +++ b/parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py @@ -4,7 +4,6 @@ import parsl from parsl import File, python_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SingleNodeLauncher @@ -26,7 +25,6 @@ def local_config(): max_workers_per_node=1, encrypted=True, provider=LocalProvider( - channel=LocalChannel(), init_blocks=0, max_blocks=_max_blocks, min_blocks=_min_blocks, diff --git a/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py b/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py index 529877eac7..90a9b9ff1b 100644 --- a/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py +++ b/parsl/tests/test_scaling/test_scale_down_htex_unregistered.py @@ -5,7 +5,6 @@ import parsl from parsl import File, python_app -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.jobs.states import TERMINAL_STATES, JobState @@ -31,7 +30,6 @@ def local_config(): encrypted=True, launch_cmd="sleep inf", provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=_max_blocks, min_blocks=_min_blocks, diff --git a/parsl/tests/test_scaling/test_shutdown_scalein.py b/parsl/tests/test_scaling/test_shutdown_scalein.py index 1d1557ebb1..2505c79aca 100644 --- a/parsl/tests/test_scaling/test_shutdown_scalein.py +++ b/parsl/tests/test_scaling/test_shutdown_scalein.py @@ -4,7 +4,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SimpleLauncher @@ -47,7 +46,6 @@ def test_shutdown_scalein_blocks(tmpd_cwd, try_assert): scaled in at DFK shutdown. """ accumulating_provider = AccumulatingLocalProvider( - channel=LocalChannel(), init_blocks=BLOCK_COUNT, min_blocks=0, max_blocks=0, diff --git a/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py b/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py index eee128634e..414f67cab6 100644 --- a/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py +++ b/parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py @@ -6,7 +6,6 @@ import zmq import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.launchers import SimpleLauncher @@ -24,7 +23,6 @@ def fresh_config(): cores_per_worker=1, encrypted=False, provider=LocalProvider( - channel=LocalChannel(), init_blocks=0, min_blocks=0, max_blocks=0, diff --git a/parsl/tests/test_staging/test_zip_in.py b/parsl/tests/test_staging/test_zip_in.py index 1f74f7e11b..9d43a4ab49 100644 --- a/parsl/tests/test_staging/test_zip_in.py +++ b/parsl/tests/test_staging/test_zip_in.py @@ -4,7 +4,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.data_provider.files import File from parsl.data_provider.zip import ZipAuthorityError, ZipFileStaging diff --git a/parsl/tests/test_staging/test_zip_out.py b/parsl/tests/test_staging/test_zip_out.py index e369031033..79fbb504d5 100644 --- a/parsl/tests/test_staging/test_zip_out.py +++ b/parsl/tests/test_staging/test_zip_out.py @@ -3,7 +3,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.data_provider.data_manager import default_staging from parsl.data_provider.files import File diff --git a/parsl/tests/test_staging/test_zip_to_zip.py b/parsl/tests/test_staging/test_zip_to_zip.py index 2c78e3bec2..3fea42167c 100644 --- a/parsl/tests/test_staging/test_zip_to_zip.py +++ b/parsl/tests/test_staging/test_zip_to_zip.py @@ -4,7 +4,6 @@ import pytest import parsl -from parsl.channels import LocalChannel from parsl.config import Config from parsl.data_provider.files import File from parsl.data_provider.zip import ZipAuthorityError, ZipFileStaging diff --git a/parsl/tests/test_utils/test_execute_wait.py b/parsl/tests/test_utils/test_execute_wait.py new file mode 100644 index 0000000000..44488c239c --- /dev/null +++ b/parsl/tests/test_utils/test_execute_wait.py @@ -0,0 +1,35 @@ +import pytest + +from parsl.utils import execute_wait + + +@pytest.mark.local +def test_env(): + ''' Regression testing for issue #27 + ''' + + rc, stdout, stderr = execute_wait("env", 1) + + stdout = stdout.split('\n') + x = [s for s in stdout if s.startswith("PATH=")] + assert x, "PATH not found" + + x = [s for s in stdout if s.startswith("HOME=")] + assert x, "HOME not found" + + +@pytest.mark.local +def test_large_output_2210(): + """Regression test for #2210. + execute_wait was hanging if the specified command gave too + much output, due to a race condition between process exiting and + pipes filling up. + """ + + # this will output 128kb of stdout + execute_wait("yes | dd count=128 bs=1024", walltime=60) + + # if this test fails, execute_wait should raise a timeout + # exception. + + # The contents out the output is not verified by this test diff --git a/parsl/utils.py b/parsl/utils.py index 0ea5d7d9eb..d8e7c81e9e 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -340,6 +340,8 @@ def __init__(self, callback: Callable, *args: Any, interval: Union[float, int] = self._thread.start() def _wake_up_timer(self) -> None: + self.make_callback() + while not self._kill_event.wait(self.interval): self.make_callback() @@ -458,3 +460,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") return sanitized + + +def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]: + ''' Synchronously execute a commandline string on the shell. + + Args: + - cmd (string) : Commandline string to execute + - walltime (int) : walltime in seconds + + Returns: + - retcode : Return code from the execution + - stdout : stdout string + - stderr : stderr string + ''' + try: + logger.debug("Creating process with command '%s'", cmd) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + preexec_fn=os.setpgrp + ) + logger.debug("Created process with pid %s. Performing communicate", proc.pid) + (stdout, stderr) = proc.communicate(timeout=walltime) + retcode = proc.returncode + logger.debug("Process %s returned %s", proc.pid, proc.returncode) + + except Exception: + logger.exception(f"Execution of command failed:\n{cmd}") + raise + else: + logger.debug("Execution of command in process %s completed normally", proc.pid) + + return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) diff --git a/parsl/version.py b/parsl/version.py index f856f6577d..e54c24ec3a 100644 --- a/parsl/version.py +++ b/parsl/version.py @@ -1,3 +1,3 @@ """Set module version. """ -VERSION = '2024.11.25+desc-2024.11.27a' +VERSION = '2024.12.09+desc-2024.12.11a' diff --git a/setup.py b/setup.py index 8950763101..64a5a70097 100755 --- a/setup.py +++ b/setup.py @@ -58,8 +58,6 @@ 'parsl/executors/high_throughput/interchange.py', 'parsl/executors/workqueue/exec_parsl_function.py', 'parsl/executors/workqueue/parsl_coprocess.py', - 'parsl/monitoring/node_reporter.py', - 'parsl/monitoring/davidadams_reporter.py', 'parsl/monitoring/allprocs.py', ],