Skip to content

Commit

Permalink
Factor interchange monitoring code into a ZMQRadioSender
Browse files Browse the repository at this point in the history
From an interchange perspective: this is a refactoring intended to clarify
that the interchange isn't doing anything special wrt. monitoring
messages and that it can send monitoring messages in the same way that
remote workers can.

From a monitoring perspective: this pulls ZMQ sender code out of the
interchange and puts it in a place that is more natural for ongoing
development. For example, a potential future use with Work Queue and
Task Vine is that workers would also benefit from using ZMQ to send
monitoring messages.

In some potential use cases, it might be desirable to configure the
radio used by the interchange instead of the hard-coded ZMQRadio. On-going
work in draft PR #3315 addresses configuration of different types of radio
and that work should be relevant here too.
  • Loading branch information
benclifford committed Jul 31, 2024
1 parent 5eb30f1 commit 1285bce
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 34 deletions.
64 changes: 30 additions & 34 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down Expand Up @@ -219,36 +220,26 @@ def task_puller(self) -> NoReturn:
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
if self.hub_address and self.hub_zmq_port:
logger.info("Connecting to MonitoringHub")
# This is a one-off because monitoring is unencrypted
hub_channel = zmq.Context().socket(zmq.DEALER)
hub_channel.set_hwm(0)
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port))
logger.info("Connected to MonitoringHub")
return hub_channel
else:
return None

def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None:
if hub_channel:
def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
if monitoring_radio:
logger.info("Sending message {} to MonitoringHub".format(manager))

d: Dict = cast(Dict, manager.copy())
d['timestamp'] = datetime.datetime.now()
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])

hub_channel.send_pyobj((MessageType.NODE_INFO, d))
monitoring_radio.send((MessageType.NODE_INFO, d))

@wrap_with_logs(target="interchange")
def _command_server(self) -> NoReturn:
""" Command server to run async command to the interchange
"""
logger.debug("Command Server Starting")

# Need to create a new ZMQ socket for command server thread
hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
else:
monitoring_radio = None

reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)

Expand Down Expand Up @@ -298,7 +289,7 @@ def _command_server(self) -> NoReturn:
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)
else:
logger.warning("Worker to hold was not in ready managers list")

Expand Down Expand Up @@ -333,9 +324,14 @@ def start(self) -> None:
# parent-process-inheritance problems.
signal.signal(signal.SIGTERM, signal.SIG_DFL)

logger.info("Incoming ports bound")
logger.info("Starting main interchange method")

hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
logger.debug("Creating monitoring radio")
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
logger.debug("Created monitoring radio")
else:
monitoring_radio = None

poll_period = self.poll_period

Expand Down Expand Up @@ -366,10 +362,10 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event)
self.process_results_incoming(interesting_managers, hub_channel)
self.expire_bad_managers(interesting_managers, hub_channel)
self.expire_drained_managers(interesting_managers, hub_channel)
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
self.expire_bad_managers(interesting_managers, monitoring_radio)
self.expire_drained_managers(interesting_managers, monitoring_radio)
self.process_tasks_to_send(interesting_managers)

self.zmq_context.destroy()
Expand All @@ -380,7 +376,7 @@ def start(self) -> None:
def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
hub_channel: Optional[zmq.Socket],
monitoring_radio: Optional[MonitoringRadioSender],
kill_event: threading.Event
) -> None:
"""Process one message from manager on the task_outgoing channel.
Expand Down Expand Up @@ -434,7 +430,7 @@ def process_task_outgoing_incoming(
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
Expand Down Expand Up @@ -465,7 +461,7 @@ def process_task_outgoing_incoming(
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")

def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:

for manager_id in list(interesting_managers):
# is it always true that a draining manager will be in interesting managers?
Expand All @@ -478,7 +474,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel:
self._ready_managers.pop(manager_id)

m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers
Expand Down Expand Up @@ -521,7 +517,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
# Receive any results and forward to client
if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN:
logger.debug("entering results_incoming section")
Expand All @@ -541,11 +537,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
elif r['type'] == 'monitoring':
# the monitoring code makes the assumption that no
# monitoring messages will be received if monitoring
# is not configured, and that hub_channel will only
# is not configured, and that monitoring_radio will only
# be None when monitoring is not configurated.
assert hub_channel is not None
assert monitoring_radio is not None

hub_channel.send_pyobj(r['payload'])
monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
Expand Down Expand Up @@ -589,15 +585,15 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
interesting_managers.add(manager_id)
logger.debug("leaving results_incoming section")

def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager")
for tid in m['tasks']:
Expand Down
16 changes: 16 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from multiprocessing.queues import Queue
from typing import Optional

import zmq

from parsl.serialize import serialize

_db_manager_excepts: Optional[Exception]
Expand Down Expand Up @@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None:

def send(self, message: object) -> None:
self.queue.put((message, 0))


class ZMQRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over ZMQ. This radio is not
thread-safe, because its use of ZMQ is not thread-safe.
"""

def __init__(self, hub_address: str, hub_zmq_port: int) -> None:
self._hub_channel = zmq.Context().socket(zmq.DEALER)
self._hub_channel.set_hwm(0)
self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}")

def send(self, message: object) -> None:
self._hub_channel.send_pyobj(message)

0 comments on commit 1285bce

Please sign in to comment.