Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move monitoring radios to own modules #3707

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down
191 changes: 0 additions & 191 deletions parsl/monitoring/radios.py

This file was deleted.

Empty file.
13 changes: 13 additions & 0 deletions parsl/monitoring/radios/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Optional

_db_manager_excepts: Optional[Exception]

logger = logging.getLogger(__name__)


class MonitoringRadioSender(metaclass=ABCMeta):
@abstractmethod
def send(self, message: object) -> None:
pass
52 changes: 52 additions & 0 deletions parsl/monitoring/radios/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import os
import pickle
import uuid

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.

The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir

The writer creates a message in tmp/ and then when it is fully
written, moves it atomically into new/

The reader ignores tmp/ and only reads and deletes messages from
new/

This avoids a race condition of reading partially written messages.

This radio is likely to give higher shared filesystem load compared to
the UDP radio, but should be much more reliable.
"""

def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str):
logger.info("filesystem based monitoring channel initializing")
self.base_path = f"{run_dir}/monitor-fs-radio/"
self.tmp_path = f"{self.base_path}/tmp"
self.new_path = f"{self.base_path}/new"

os.makedirs(self.tmp_path, exist_ok=True)
os.makedirs(self.new_path, exist_ok=True)

def send(self, message: object) -> None:
logger.info("Sending a monitoring message via filesystem")

unique_id = str(uuid.uuid4())

tmp_filename = f"{self.tmp_path}/{unique_id}"
new_filename = f"{self.new_path}/{unique_id}"
buffer = message

# this will write the message out then atomically
# move it into new/, so that a partially written
# file will never be observed in new/
with open(tmp_filename, "wb") as f:
pickle.dump(buffer, f)
os.rename(tmp_filename, new_filename)
57 changes: 57 additions & 0 deletions parsl/monitoring/radios/htex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import pickle

from parsl.monitoring.radios.base import MonitoringRadioSender

logger = logging.getLogger(__name__)


class HTEXRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, timeout: int = 10):
"""
Parameters
----------

monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
timeout : int
timeout, default=10s
"""
logger.info("htex-based monitoring channel initialising")

def send(self, message: object) -> None:
""" Sends a message to the UDP receiver

Parameter
---------

message: object
Arbitrary pickle-able object that is to be sent

Returns:
None
"""

import parsl.executors.high_throughput.monitoring_info

result_queue = parsl.executors.high_throughput.monitoring_info.result_queue

# this message needs to go in the result queue tagged so that it is treated
# i) as a monitoring message by the interchange, and then further more treated
# as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO
# which is the implicit default for messages from the interchange)

# for the interchange, the outer wrapper, this needs to be a dict:

interchange_msg = {
'type': 'monitoring',
'payload': message
}

if result_queue:
result_queue.put(pickle.dumps(interchange_msg))
else:
logger.error("result_queue is uninitialized - cannot put monitoring message")

return
17 changes: 17 additions & 0 deletions parsl/monitoring/radios/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from multiprocessing.queues import Queue

from parsl.monitoring.radios.base import MonitoringRadioSender


class MultiprocessingQueueRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
access to a Queue shared with the monitoring database code (bypassing the
monitoring router).
"""
def __init__(self, queue: Queue) -> None:
self.queue = queue

def send(self, message: object) -> None:
self.queue.put(message)
Loading