Skip to content

Commit

Permalink
work on pluggable API for worker-side monitoring radio
Browse files Browse the repository at this point in the history
patch so far is overloading self.monitoring_radio for two different uses that should be clarified:
submit side radio and worker side radio

its a bit complicated for having a default radio (UDPRadio or HTEXRadio) even when monitoring is turned off:
I should check that the radio receiver does not get activated in this case: for example:
  * test that broken radio activation causes a startup error
  * test that configuration with broken radio doesn't cause a startup error when monitoringhub is not configured

zmq radio should always listen, and be the place where all radio receivers send their data.

udp radio and filesystem radio should turn into separate... something... processes?

for prototyping i guess it doesn't matter where I make them live, but the most behaviour preserving would keep them somehow separated?
  • Loading branch information
benclifford committed Jun 24, 2024
1 parent c01f991 commit 59dd1f4
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 155 deletions.
21 changes: 19 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,16 +744,21 @@ def launch_task(self, task_record: TaskRecord) -> Future:

if self.monitoring is not None and self.monitoring.resource_monitoring_enabled:
wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO

# this is only for UDP... it should be some kind of config-specific initialisation
# which could also start threads, and this should be one-shot
# executor.monitoring_radio.ip = self.monitoring.hub_address # type: ignore[attr-defined]
# executor.monitoring_radio.port = self.monitoring.udp_port # type: ignore[attr-defined]

(function, args, kwargs) = monitor_wrapper(f=function,
args=args,
kwargs=kwargs,
x_try_id=try_id,
x_task_id=task_id,
monitoring_hub_url=self.monitoring.monitoring_hub_url,
radio_config=executor.remote_monitoring_radio_config,
run_id=self.run_id,
logging_level=wrapper_logging_level,
sleep_dur=self.monitoring.resource_monitoring_interval,
radio_mode=executor.radio_mode,
monitor_resources=executor.monitor_resources(),
run_dir=self.run_dir)

Expand Down Expand Up @@ -1181,6 +1186,18 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
executor.hub_address = self.monitoring.hub_address
executor.hub_zmq_port = self.monitoring.hub_zmq_port
executor.submit_monitoring_radio = self.monitoring.radio
# this will modify the radio config object: it will add relevant parameters needed
# for the particular remote radio sender to communicate back
logger.info("starting monitoring receiver "
f"for executor {executor} "
f"with remote monitoring radio config {executor.remote_monitoring_radio_config}")
executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config,
ip=self.monitoring.hub_address)
# TODO: this is a weird way to start the receiver.
# Rather than in executor.start, but there's a tangle here
# trying to make the executors usable in a non-pure-parsl
# context where there is no DFK to grab config out of?
# (and no monitoring...)
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')
Expand Down
42 changes: 34 additions & 8 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import logging
import os
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios import (
MonitoringRadioReceiver,
MonitoringRadioSender,
RadioConfig,
UDPRadio,
)

logger = logging.getLogger(__name__)


class ParslExecutor(metaclass=ABCMeta):
Expand All @@ -19,15 +27,13 @@ class ParslExecutor(metaclass=ABCMeta):
no arguments and re-raises any thrown exception.
In addition to the listed methods, a ParslExecutor instance must always
have a member field:
have these member fields:
label: str - a human readable label for the executor, unique
with respect to other executors.
Per-executor monitoring behaviour can be influenced by exposing:
radio_mode: str - a string describing which radio mode should be used to
send task resource data back to the submit side.
remote_monitoring_radio_config: RadioConfig describing how tasks on this executor
should report task resource status
An executor may optionally expose:
Expand All @@ -45,11 +51,16 @@ class ParslExecutor(metaclass=ABCMeta):
"""

label: str = "undefined"
radio_mode: str = "udp"

def __init__(
self,
*,

# TODO: I'd like these two to go away but they're needed right now
# to configure the interchange monitoring radio, that is
# in addition to the submit and worker monitoring radios (!). They
# are effectivley a third monitoring radio config, though, so what
# should that look like for the interchange?
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
Expand All @@ -58,10 +69,19 @@ def __init__(
):
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port

# these are parameters for the monitoring radio to be used on the remote side
# eg. in workers - to send results back, and they should end up encapsulated
# inside a RadioConfig.
self.submit_monitoring_radio = submit_monitoring_radio
self.remote_monitoring_radio_config: RadioConfig = UDPRadio()

self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id

# will be set externally later, which is pretty ugly
self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None

def __enter__(self) -> Self:
return self

Expand Down Expand Up @@ -94,7 +114,13 @@ def shutdown(self) -> None:
This includes all attached resources such as workers and controllers.
"""
pass
logger.debug("Starting base executor shutdown")
# logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}")
if self.monitoring_receiver is not None:
logger.debug("Starting monitoring receiver shutdown")
self.monitoring_receiver.shutdown()
logger.debug("Done with monitoring receiver shutdown")
logger.debug("Done with base executor shutdown")

def monitor_resources(self) -> bool:
"""Should resource monitoring happen for tasks on running on this executor?
Expand Down
16 changes: 13 additions & 3 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.monitoring.radios import HTEXRadio, RadioConfig
from parsl.process_loggers import wrap_with_logs
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider
Expand Down Expand Up @@ -254,11 +255,13 @@ def __init__(self,
enable_mpi_mode: bool = False,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
remote_monitoring_radio_config: Optional[RadioConfig] = None):

logger.debug("Initializing HighThroughputExecutor")

BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler)

self.label = label
self.worker_debug = worker_debug
self.storage_access = storage_access
Expand Down Expand Up @@ -302,6 +305,12 @@ def __init__(self,
self._workers_per_node = 1 # our best guess-- we do not have any provider hints

self._task_counter = 0

if remote_monitoring_radio_config is not None:
self.remote_monitoring_radio_config = remote_monitoring_radio_config
else:
self.remote_monitoring_radio_config = HTEXRadio()

self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.interchange_proc: Optional[subprocess.Popen] = None
Expand Down Expand Up @@ -329,8 +338,6 @@ def __init__(self,
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd

radio_mode = "htex"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
f"{old} is deprecated and will be removed in a future release. "
Expand Down Expand Up @@ -822,6 +829,9 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Unable to terminate Interchange process; sending SIGKILL")
self.interchange_proc.kill()

# TODO: implement this across all executors
super().shutdown()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,13 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
"[%(levelname)s] %(message)s"

logger = logging.getLogger(name)
logger2 = logging.getLogger("")
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(filename)
handler.setLevel(level)
formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger2.addHandler(handler)
return logger


Expand Down
2 changes: 2 additions & 0 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ def shutdown(self, *args, **kwargs):
self._finished_task_queue.close()
self._finished_task_queue.join_thread()

super().shutdown()

logger.debug("TaskVine shutdown completed")

@wrap_with_logs
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def shutdown(self, block=True):
"""
logger.debug("Shutting down executor, which involves waiting for running tasks to complete")
self.executor.shutdown(wait=block)
super().shutdown()
logger.debug("Done with executor shutdown")

def monitor_resources(self):
Expand Down
2 changes: 2 additions & 0 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,8 @@ def shutdown(self, *args, **kwargs):
self.collector_queue.close()
self.collector_queue.join_thread()

super().shutdown()

logger.debug("Work Queue shutdown completed")

@wrap_with_logs
Expand Down
58 changes: 41 additions & 17 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import os
import queue
import time
from multiprocessing import Event, Process
from multiprocessing import Event
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast

import typeguard

from parsl.log_utils import set_file_logger
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.radios import MultiprocessingQueueRadioSender, RadioConfig
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import AddressedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down Expand Up @@ -128,7 +128,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
# in the future, Queue will allow runtime subscripts.

if TYPE_CHECKING:
comm_q: Queue[Union[Tuple[int, int], str]]
comm_q: Queue[Union[int, str]]
else:
comm_q: Queue

Expand Down Expand Up @@ -156,7 +156,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs,
self.block_msgs, self.resource_msgs, self.router_exit_event),
kwargs={"hub_address": self.hub_address,
"udp_port": self.hub_port,
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
Expand All @@ -179,13 +178,13 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.dbm_proc.start()
logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid))

self.filesystem_proc = Process(target=filesystem_receiver,
args=(self.logdir, self.resource_msgs, dfk_run_dir),
name="Monitoring-Filesystem-Process",
daemon=True
)
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
# self.filesystem_proc = Process(target=filesystem_receiver,
# args=(self.logdir, self.resource_msgs, dfk_run_dir),
# name="Monitoring-Filesystem-Process",
# daemon=True
# )
# self.filesystem_proc.start()
# logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadioSender(self.block_msgs)

Expand All @@ -201,9 +200,23 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
logger.error(f"MonitoringRouter sent an error message: {comm_q_result}")
raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}")

udp_port, zmq_port = comm_q_result
zmq_port = comm_q_result

self.zmq_port = zmq_port

self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
# need to initialize radio configs, perhaps first time a radio config is used
# in each executor? (can't do that at startup because executor list is dynamic,
# don't know all the executors till later)
# self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
# How can this config be populated properly?
# There's a UDP port chosen right now by the monitoring router and
# sent back a line above...
# What does that look like for other radios? htexradio has no specific config at all,
# filesystem radio has a path (that should have been created?) for config, and a loop
# that needs to be running, started in this start method.
# so something like... radio_config.receive() generates the appropriate receiver object?
# which has a shutdown method on it for later. and also updates radio_config itself so
# it has the right info to send across the wire? or some state driving like that?

logger.info("Monitoring Hub initialized")

Expand Down Expand Up @@ -235,7 +248,7 @@ def close(self) -> None:
)
self.router_proc.terminate()
self.dbm_proc.terminate()
self.filesystem_proc.terminate()
# self.filesystem_proc.terminate()
logger.info("Setting router termination event")
self.router_exit_event.set()
logger.info("Waiting for router to terminate")
Expand All @@ -255,9 +268,9 @@ def close(self) -> None:
# should this be message based? it probably doesn't need to be if
# we believe we've received all messages
logger.info("Terminating filesystem radio receiver process")
self.filesystem_proc.terminate()
self.filesystem_proc.join()
self.filesystem_proc.close()
# self.filesystem_proc.terminate()
# self.filesystem_proc.join()
# self.filesystem_proc.close()

logger.info("Closing monitoring multiprocessing queues")
self.exception_q.close()
Expand All @@ -272,6 +285,17 @@ def close(self) -> None:
self.block_msgs.join_thread()
logger.info("Closed monitoring multiprocessing queues")

def start_receiver(self, radio_config: RadioConfig, ip: str) -> Any:
"""somehow start a radio receiver here and update radioconfig to be sent over the wire, without
losing the info we need to shut down that receiver later...
"""
r = radio_config.create_receiver(ip=ip, resource_msgs=self.resource_msgs) # TODO: return a shutdownable...
logger.info(f"BENC: created receiver {r}")
# assert r is not None
return r
# ... that is, a thing we need to do a shutdown call on at shutdown, a "shutdownable"? without
# expecting any more structure on it?


@wrap_with_logs
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
Expand Down
Loading

0 comments on commit 59dd1f4

Please sign in to comment.