Skip to content

Commit

Permalink
plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Mar 29, 2024
1 parent 1a85eb8 commit 9ad2288
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 68 deletions.
9 changes: 7 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,16 +714,21 @@ def launch_task(self, task_record: TaskRecord) -> Future:

if self.monitoring is not None and self.monitoring.resource_monitoring_enabled:
wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO

# this is only for UDP... it should be some kind of config-specific initialisation
# which could also start threads, and this should be one-shot
executor.monitoring_radio.ip = self.monitoring.hub_address # type: ignore[attr-defined]
executor.monitoring_radio.port = self.monitoring.udp_port # type: ignore[attr-defined]

(function, args, kwargs) = monitor_wrapper(f=function,
args=args,
kwargs=kwargs,
x_try_id=try_id,
x_task_id=task_id,
monitoring_hub_url=self.monitoring.monitoring_hub_url,
radio_config=executor.monitoring_radio,
run_id=self.run_id,
logging_level=wrapper_logging_level,
sleep_dur=self.monitoring.resource_monitoring_interval,
radio_mode=executor.radio_mode,
monitor_resources=executor.monitor_resources(),
run_dir=self.run_dir)

Expand Down
14 changes: 8 additions & 6 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing_extensions import Literal, Self

from parsl.jobs.states import JobStatus
from parsl.monitoring.radios import RadioConfig, UDPRadio


class ParslExecutor(metaclass=ABCMeta):
Expand All @@ -17,15 +18,13 @@ class ParslExecutor(metaclass=ABCMeta):
no arguments and re-raises any thrown exception.
In addition to the listed methods, a ParslExecutor instance must always
have a member field:
have these member fields:
label: str - a human readable label for the executor, unique
with respect to other executors.
Per-executor monitoring behaviour can be influenced by exposing:
radio_mode: str - a string describing which radio mode should be used to
send task resource data back to the submit side.
monitoring_radio: RadioConfig describing how tasks on this executor
should report task resource status
An executor may optionally expose:
Expand All @@ -43,7 +42,10 @@ class ParslExecutor(metaclass=ABCMeta):
"""

label: str = "undefined"
radio_mode: str = "udp"

def __init__(self) -> None:
self.monitoring_radio: RadioConfig
self.monitoring_radio = UDPRadio()

def __enter__(self) -> Self:
return self
Expand Down
14 changes: 10 additions & 4 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from parsl.data_provider.staging import Staging
from parsl.addresses import get_all_addresses
from parsl.process_loggers import wrap_with_logs

from parsl.monitoring.radios import RadioConfig, HTEXRadio
from parsl.multiprocessing import ForkProcess
from parsl.utils import RepresentationMixin
from parsl.providers import LocalProvider
Expand Down Expand Up @@ -257,11 +257,13 @@ def __init__(self,
enable_mpi_mode: bool = False,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
monitoring_radio: Optional[RadioConfig] = None):

logger.debug("Initializing HighThroughputExecutor")

BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler)

self.label = label
self.worker_debug = worker_debug
self.storage_access = storage_access
Expand Down Expand Up @@ -308,6 +310,12 @@ def __init__(self,
self.run_id = None # set to the correct run_id in dfk
self.hub_address = None # set to the correct hub address in dfk
self.hub_port = None # set to the correct hub port in dfk

if monitoring_radio is not None:
self.monitoring_radio = monitoring_radio
else:
self.monitoring_radio = HTEXRadio()

self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.interchange_proc: Optional[Process] = None
Expand Down Expand Up @@ -335,8 +343,6 @@ def __init__(self,
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd

radio_mode = "htex"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
f"{old} is deprecated and will be removed in a future release. "
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,12 +801,13 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
"[%(levelname)s] %(message)s"

logger = logging.getLogger(name)
logger2 = logging.getLogger("")
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(filename)
handler.setLevel(level)
formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger2.addHandler(handler)
return logger


Expand Down
20 changes: 19 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def __init__(self,
Parsl log directory paths. Logs and temp files go here. Default: '.'
monitoring_debug : Bool
Enable monitoring debug logging. Default: False
radio_config: RadioConfig
The monitoring radio to be used to send monitoring information from workers back to the
submit side.
resource_monitoring_enabled : boolean
Set this field to True to enable logging of information from the worker side.
This will include environment information such as start time, hostname and block id,
Expand Down Expand Up @@ -203,7 +206,22 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat

udp_port, zmq_port = comm_q_result

self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
self.udp_port = udp_port
self.zmq_port = zmq_port

# need to initialize radio configs, perhaps first time a radio config is used
# in each executor? (can't do that at startup because executor list is dynamic,
# don't know all the executors till later)
# self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
# How can this config be populated properly?
# There's a UDP port chosen right now by the monitoring router and
# sent back a line above...
# What does that look like for other radios? htexradio has no specific config at all,
# filesystem radio has a path (that should have been created?) for config, and a loop
# that needs to be running, started in this start method.
# so something like... radio_config.receive() generates the appropriate receiver object?
# which has a shutdown method on it for later. and also updates radio_config itself so
# it has the right info to send across the wire? or some state driving like that?

context = zmq.Context()
self.dfk_channel_timeout = 10000 # in milliseconds
Expand Down
49 changes: 30 additions & 19 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,31 @@ def send(self, message: object) -> None:
pass


class RadioConfig(metaclass=ABCMeta):
"""Base class for radio plugin configuration.
"""
# mode: str
# monitoring_hub_url: str

@abstractmethod
def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender:
pass


class UDPRadio(RadioConfig):
ip: str
port: int
# this needs to be initialised by magic...

def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender:
return UDPRadioSender(self.ip, self.port, source_id)


class HTEXRadio(RadioConfig):
def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender:
return HTEXRadioSender(source_id=source_id)


class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.
Expand Down Expand Up @@ -69,18 +94,7 @@ def send(self, message: object) -> None:

class HTEXRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
"""
Parameters
----------
monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
source_id : str
String identifier of the source
timeout : int
timeout, default=10s
"""
def __init__(self, source_id: int):
self.source_id = source_id
logger.info("htex-based monitoring channel initialising")

Expand Down Expand Up @@ -123,26 +137,23 @@ def send(self, message: object) -> None:

class UDPRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
def __init__(self, ip: str, port: int, source_id: int, timeout: int = 10) -> None:
"""
Parameters
----------
XXX TODO
monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
source_id : str
String identifier of the source
timeout : int
timeout, default=10s
"""
self.monitoring_url = monitoring_url
self.sock_timeout = timeout
self.source_id = source_id
try:
self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':'))
self.port = int(port)
except Exception:
raise Exception("Failed to parse monitoring url: {}".format(monitoring_url))
self.ip = ip
self.port = port

self.sock = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM,
Expand Down
59 changes: 24 additions & 35 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from parsl.process_loggers import wrap_with_logs

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, UDPRadioSender, HTEXRadioSender, FilesystemRadioSender
from parsl.monitoring.radios import MonitoringRadioSender, RadioConfig
from typing import Any, Callable, Dict, List, Sequence, Tuple

logger = logging.getLogger(__name__)
Expand All @@ -21,11 +21,10 @@ def monitor_wrapper(*,
kwargs: Dict, # per invocation
x_try_id: int, # per invocation
x_task_id: int, # per invocation
monitoring_hub_url: str, # per workflow
radio_config: RadioConfig, # per executor
run_id: str, # per workflow
logging_level: int, # per workflow
sleep_dur: float, # per workflow
radio_mode: str, # per executor
monitor_resources: bool, # per workflow
run_dir: str) -> Tuple[Callable, Sequence, Dict]:
"""Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins.
Expand All @@ -39,9 +38,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
# Send first message to monitoring router
send_first_message(try_id,
task_id,
monitoring_hub_url,
radio_config,
run_id,
radio_mode,
run_dir)

if monitor_resources and sleep_dur > 0:
Expand All @@ -50,9 +48,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
args=(os.getpid(),
try_id,
task_id,
monitoring_hub_url,
radio_config,
run_id,
radio_mode,
logging_level,
sleep_dur,
run_dir,
Expand Down Expand Up @@ -85,9 +82,9 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:

send_last_message(try_id,
task_id,
monitoring_hub_url,
radio_config,
run_id,
radio_mode, run_dir)
run_dir)

new_kwargs = kwargs.copy()
new_kwargs['_parsl_monitoring_task_id'] = x_task_id
Expand All @@ -96,49 +93,42 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
return (wrapped, args, new_kwargs)


def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender:
def get_radio(radio_config: RadioConfig, task_id: int, run_dir: str) -> MonitoringRadioSender:

radio: MonitoringRadioSender
if radio_mode == "udp":
radio = UDPRadioSender(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadioSender(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")

radio = radio_config.create_sender(source_id=task_id, run_dir=run_dir)

return radio


@wrap_with_logs
def send_first_message(try_id: int,
task_id: int,
monitoring_hub_url: str,
run_id: str, radio_mode: str, run_dir: str) -> None:
send_first_last_message(try_id, task_id, monitoring_hub_url, run_id,
radio_mode, run_dir, False)
radio_config: RadioConfig,
run_id: str, run_dir: str) -> None:
send_first_last_message(try_id, task_id, radio_config, run_id,
run_dir, False)


@wrap_with_logs
def send_last_message(try_id: int,
task_id: int,
monitoring_hub_url: str,
run_id: str, radio_mode: str, run_dir: str) -> None:
send_first_last_message(try_id, task_id, monitoring_hub_url, run_id,
radio_mode, run_dir, True)
radio_config: RadioConfig,
run_id: str, run_dir: str) -> None:
send_first_last_message(try_id, task_id, radio_config, run_id,
run_dir, True)


def send_first_last_message(try_id: int,
task_id: int,
monitoring_hub_url: str,
run_id: str, radio_mode: str, run_dir: str,
radio_config: RadioConfig,
run_id: str, run_dir: str,
is_last: bool) -> None:
import platform
import os

radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir)
radio = get_radio(radio_config, task_id, run_dir)

msg = (MessageType.RESOURCE_INFO,
{'run_id': run_id,
Expand All @@ -158,9 +148,8 @@ def send_first_last_message(try_id: int,
def monitor(pid: int,
try_id: int,
task_id: int,
monitoring_hub_url: str,
radio_config: RadioConfig,
run_id: str,
radio_mode: str,
logging_level: int,
sleep_dur: float,
run_dir: str,
Expand All @@ -183,7 +172,7 @@ def monitor(pid: int,

setproctitle("parsl: task resource monitor")

radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir)
radio = get_radio(radio_config, task_id, run_dir)

logging.debug("start of monitor")

Expand Down

0 comments on commit 9ad2288

Please sign in to comment.