diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 7421938e39..977e98bb45 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -714,16 +714,21 @@ def launch_task(self, task_record: TaskRecord) -> Future: if self.monitoring is not None and self.monitoring.resource_monitoring_enabled: wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO + + # this is only for UDP... it should be some kind of config-specific initialisation + # which could also start threads, and this should be one-shot + executor.monitoring_radio.ip = self.monitoring.hub_address # type: ignore[attr-defined] + executor.monitoring_radio.port = self.monitoring.udp_port # type: ignore[attr-defined] + (function, args, kwargs) = monitor_wrapper(f=function, args=args, kwargs=kwargs, x_try_id=try_id, x_task_id=task_id, - monitoring_hub_url=self.monitoring.monitoring_hub_url, + radio_config=executor.monitoring_radio, run_id=self.run_id, logging_level=wrapper_logging_level, sleep_dur=self.monitoring.resource_monitoring_interval, - radio_mode=executor.radio_mode, monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 909e5efef6..f634f9f39d 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -4,6 +4,7 @@ from typing_extensions import Literal, Self from parsl.jobs.states import JobStatus +from parsl.monitoring.radios import RadioConfig, UDPRadio class ParslExecutor(metaclass=ABCMeta): @@ -17,15 +18,13 @@ class ParslExecutor(metaclass=ABCMeta): no arguments and re-raises any thrown exception. In addition to the listed methods, a ParslExecutor instance must always - have a member field: + have these member fields: label: str - a human readable label for the executor, unique with respect to other executors. - Per-executor monitoring behaviour can be influenced by exposing: - - radio_mode: str - a string describing which radio mode should be used to - send task resource data back to the submit side. + monitoring_radio: RadioConfig describing how tasks on this executor + should report task resource status An executor may optionally expose: @@ -43,7 +42,10 @@ class ParslExecutor(metaclass=ABCMeta): """ label: str = "undefined" - radio_mode: str = "udp" + + def __init__(self) -> None: + self.monitoring_radio: RadioConfig + self.monitoring_radio = UDPRadio() def __enter__(self) -> Self: return self diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index e7dce07dba..6f87ab4faf 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -35,7 +35,7 @@ from parsl.data_provider.staging import Staging from parsl.addresses import get_all_addresses from parsl.process_loggers import wrap_with_logs - +from parsl.monitoring.radios import RadioConfig, HTEXRadio from parsl.multiprocessing import ForkProcess from parsl.utils import RepresentationMixin from parsl.providers import LocalProvider @@ -257,11 +257,13 @@ def __init__(self, enable_mpi_mode: bool = False, mpi_launcher: str = "mpiexec", block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + monitoring_radio: Optional[RadioConfig] = None): logger.debug("Initializing HighThroughputExecutor") BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler) + self.label = label self.worker_debug = worker_debug self.storage_access = storage_access @@ -308,6 +310,12 @@ def __init__(self, self.run_id = None # set to the correct run_id in dfk self.hub_address = None # set to the correct hub address in dfk self.hub_port = None # set to the correct hub port in dfk + + if monitoring_radio is not None: + self.monitoring_radio = monitoring_radio + else: + self.monitoring_radio = HTEXRadio() + self.worker_ports = worker_ports self.worker_port_range = worker_port_range self.interchange_proc: Optional[Process] = None @@ -335,8 +343,6 @@ def __init__(self, launch_cmd = DEFAULT_LAUNCH_CMD self.launch_cmd = launch_cmd - radio_mode = "htex" - def _warn_deprecated(self, old: str, new: str): warnings.warn( f"{old} is deprecated and will be removed in a future release. " diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 0a2bc513dd..6ce836e578 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -801,12 +801,13 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_ "[%(levelname)s] %(message)s" logger = logging.getLogger(name) + logger2 = logging.getLogger("") logger.setLevel(logging.DEBUG) handler = logging.FileHandler(filename) handler.setLevel(level) formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) - logger.addHandler(handler) + logger2.addHandler(handler) return logger diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 3a6b88e113..e1a4e0adbd 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -79,6 +79,9 @@ def __init__(self, Parsl log directory paths. Logs and temp files go here. Default: '.' monitoring_debug : Bool Enable monitoring debug logging. Default: False + radio_config: RadioConfig + The monitoring radio to be used to send monitoring information from workers back to the + submit side. resource_monitoring_enabled : boolean Set this field to True to enable logging of information from the worker side. This will include environment information such as start time, hostname and block id, @@ -203,7 +206,22 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat udp_port, zmq_port = comm_q_result - self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + self.udp_port = udp_port + self.zmq_port = zmq_port + + # need to initialize radio configs, perhaps first time a radio config is used + # in each executor? (can't do that at startup because executor list is dynamic, + # don't know all the executors till later) + # self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + # How can this config be populated properly? + # There's a UDP port chosen right now by the monitoring router and + # sent back a line above... + # What does that look like for other radios? htexradio has no specific config at all, + # filesystem radio has a path (that should have been created?) for config, and a loop + # that needs to be running, started in this start method. + # so something like... radio_config.receive() generates the appropriate receiver object? + # which has a shutdown method on it for later. and also updates radio_config itself so + # it has the right info to send across the wire? or some state driving like that? context = zmq.Context() self.dfk_channel_timeout = 10000 # in milliseconds diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index ca51a0aae6..a98430a663 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -22,6 +22,31 @@ def send(self, message: object) -> None: pass +class RadioConfig(metaclass=ABCMeta): + """Base class for radio plugin configuration. + """ + # mode: str + # monitoring_hub_url: str + + @abstractmethod + def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender: + pass + + +class UDPRadio(RadioConfig): + ip: str + port: int + # this needs to be initialised by magic... + + def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender: + return UDPRadioSender(self.ip, self.port, source_id) + + +class HTEXRadio(RadioConfig): + def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender: + return HTEXRadioSender(source_id=source_id) + + class FilesystemRadioSender(MonitoringRadioSender): """A MonitoringRadioSender that sends messages over a shared filesystem. @@ -69,18 +94,7 @@ def send(self, message: object) -> None: class HTEXRadioSender(MonitoringRadioSender): - def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - source_id : str - String identifier of the source - timeout : int - timeout, default=10s - """ + def __init__(self, source_id: int): self.source_id = source_id logger.info("htex-based monitoring channel initialising") @@ -123,11 +137,12 @@ def send(self, message: object) -> None: class UDPRadioSender(MonitoringRadioSender): - def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): + def __init__(self, ip: str, port: int, source_id: int, timeout: int = 10) -> None: """ Parameters ---------- + XXX TODO monitoring_url : str URL of the form ://: source_id : str @@ -135,14 +150,10 @@ def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): timeout : int timeout, default=10s """ - self.monitoring_url = monitoring_url self.sock_timeout = timeout self.source_id = source_id - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + self.ip = ip + self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index e95093223c..17695d67a8 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -9,7 +9,7 @@ from parsl.process_loggers import wrap_with_logs from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MonitoringRadioSender, UDPRadioSender, HTEXRadioSender, FilesystemRadioSender +from parsl.monitoring.radios import MonitoringRadioSender, RadioConfig from typing import Any, Callable, Dict, List, Sequence, Tuple logger = logging.getLogger(__name__) @@ -21,11 +21,10 @@ def monitor_wrapper(*, kwargs: Dict, # per invocation x_try_id: int, # per invocation x_task_id: int, # per invocation - monitoring_hub_url: str, # per workflow + radio_config: RadioConfig, # per executor run_id: str, # per workflow logging_level: int, # per workflow sleep_dur: float, # per workflow - radio_mode: str, # per executor monitor_resources: bool, # per workflow run_dir: str) -> Tuple[Callable, Sequence, Dict]: """Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins. @@ -39,9 +38,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: # Send first message to monitoring router send_first_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) if monitor_resources and sleep_dur > 0: @@ -50,9 +48,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: args=(os.getpid(), try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, logging_level, sleep_dur, run_dir, @@ -85,9 +82,9 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: send_last_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) + run_dir) new_kwargs = kwargs.copy() new_kwargs['_parsl_monitoring_task_id'] = x_task_id @@ -96,49 +93,42 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: +def get_radio(radio_config: RadioConfig, task_id: int, run_dir: str) -> MonitoringRadioSender: + radio: MonitoringRadioSender - if radio_mode == "udp": - radio = UDPRadioSender(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "htex": - radio = HTEXRadioSender(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "filesystem": - radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) - else: - raise RuntimeError(f"Unknown radio mode: {radio_mode}") + + radio = radio_config.create_sender(source_id=task_id, run_dir=run_dir) + return radio @wrap_with_logs def send_first_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, False) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, False) @wrap_with_logs def send_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, True) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, True) def send_first_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str, + radio_config: RadioConfig, + run_id: str, run_dir: str, is_last: bool) -> None: import platform import os - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id, run_dir) msg = (MessageType.RESOURCE_INFO, {'run_id': run_id, @@ -158,9 +148,8 @@ def send_first_last_message(try_id: int, def monitor(pid: int, try_id: int, task_id: int, - monitoring_hub_url: str, + radio_config: RadioConfig, run_id: str, - radio_mode: str, logging_level: int, sleep_dur: float, run_dir: str, @@ -183,7 +172,7 @@ def monitor(pid: int, setproctitle("parsl: task resource monitor") - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id, run_dir) logging.debug("start of monitor")