Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace DFK and JobStatusPoller monitoring zmq channels with Queue radio #3338

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ def __init__(self, config: Config) -> None:

# this must be set before executors are added since add_executors calls
# job_status_poller.add_executors.
radio = self.monitoring.radio if self.monitoring else None
self.job_status_poller = JobStatusPoller(strategy=self.config.strategy,
strategy_period=self.config.strategy_period,
max_idletime=self.config.max_idletime,
dfk=self)
monitoring=radio)

self.executors: Dict[str, ParslExecutor] = {}

Expand Down
29 changes: 7 additions & 22 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import parsl
import time
import zmq
from typing import Dict, List, Sequence, Optional, Union

from parsl.jobs.states import JobStatus, JobState
Expand All @@ -17,25 +16,11 @@


class PolledExecutorFacade:
def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None):
def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None):
self._executor = executor
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]

# Create a ZMQ channel to send poll status to monitoring

self.hub_channel: Optional[zmq.Socket]

if dfk and dfk.monitoring is not None:
hub_address = dfk.hub_address
hub_port = dfk.hub_zmq_port
context = zmq.Context()
self.hub_channel = context.socket(zmq.DEALER)
self.hub_channel.set_hwm(0)
self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port))
logger.info("Monitoring enabled on job status poller")
else:
self.hub_channel = None
self._monitoring = monitoring

def poll(self) -> None:
now = time.time()
Expand All @@ -54,10 +39,10 @@ def poll(self) -> None:

def send_monitoring_info(self, status: Dict) -> None:
# Send monitoring info for HTEX when monitoring enabled
if self.hub_channel:
if self._monitoring:
msg = self._executor.create_monitoring_info(status)
logger.debug("Sending message {} to hub from job status poller".format(msg))
self.hub_channel.send_pyobj((MessageType.BLOCK_INFO, msg))
self._monitoring.send((MessageType.BLOCK_INFO, msg))

@property
def status(self) -> Dict[str, JobStatus]:
Expand Down Expand Up @@ -107,9 +92,9 @@ def __repr__(self) -> str:
class JobStatusPoller(Timer):
def __init__(self, *, strategy: Optional[str], max_idletime: float,
strategy_period: Union[float, int],
dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None:
monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None) -> None:
self._executor_facades = [] # type: List[PolledExecutorFacade]
self.dfk = dfk
self.monitoring = monitoring
self._strategy = Strategy(strategy=strategy,
max_idletime=max_idletime)
super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller")
Expand All @@ -131,7 +116,7 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
for executor in executors:
if executor.status_polling_interval > 0:
logger.debug("Adding executor {}".format(executor.label))
self._executor_facades.append(PolledExecutorFacade(executor, self.dfk))
self._executor_facades.append(PolledExecutorFacade(executor, self.monitoring))
self._strategy.add_executors(executors)

def close(self):
Expand Down
27 changes: 5 additions & 22 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import multiprocessing.synchronize as ms
import typeguard
import zmq

import queue

Expand All @@ -20,6 +19,7 @@

from parsl.serialize import deserialize

from parsl.monitoring.radios import MultiprocessingQueueRadio
from parsl.monitoring.router import router_starter
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage
Expand Down Expand Up @@ -92,12 +92,6 @@ def __init__(self,
Default: 30 seconds
"""

# Any is used to disable typechecking on uses of _dfk_channel,
# because it is used in the code as if it points to a channel, but
# the static type is that it can also be None. The code relies on
# .start() being called and initialising this to a real channel.
self._dfk_channel = None # type: Any

if _db_manager_excepts:
raise _db_manager_excepts

Expand Down Expand Up @@ -197,6 +191,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadio(self.block_msgs)

try:
comm_q_result = comm_q.get(block=True, timeout=120)
except queue.Empty:
Expand All @@ -211,26 +207,14 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat

self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)

context = zmq.Context()
self.dfk_channel_timeout = 10000 # in milliseconds
self._dfk_channel = context.socket(zmq.DEALER)
self._dfk_channel.setsockopt(zmq.LINGER, 0)
self._dfk_channel.set_hwm(0)
self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout)
self._dfk_channel.connect("tcp://{}:{}".format(self.hub_address, zmq_port))

logger.info("Monitoring Hub initialized")

return zmq_port

# TODO: tighten the Any message format
def send(self, mtype: MessageType, message: Any) -> None:
logger.debug("Sending message type {}".format(mtype))
try:
self._dfk_channel.send_pyobj((mtype, message))
except zmq.Again:
logger.exception(
"The monitoring message sent from DFK to router timed-out after {}ms".format(self.dfk_channel_timeout))
self.radio.send((mtype, message))

def close(self) -> None:
logger.info("Terminating Monitoring Hub")
Expand All @@ -241,9 +225,8 @@ def close(self) -> None:
logger.error("There was a queued exception (Either router or DBM process got exception much earlier?)")
except queue.Empty:
break
if self._dfk_channel and self.monitoring_hub_active:
if self.monitoring_hub_active:
self.monitoring_hub_active = False
self._dfk_channel.close()
if exception_msgs:
for exception_msg in exception_msgs:
logger.error(
Expand Down
15 changes: 15 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from abc import ABCMeta, abstractmethod

from multiprocessing.queues import Queue
from typing import Optional

from parsl.serialize import serialize
Expand Down Expand Up @@ -173,3 +174,17 @@ def send(self, message: object) -> None:
logging.error("Could not send message within timeout limit")
return
return


class MultiprocessingQueueRadio(MonitoringRadio):
"""A monitoring radio intended which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
access to a Queue shared with the monitoring database code (bypassing the
monitoring router).
"""
def __init__(self, queue: Queue) -> None:
self.queue = queue
Comment on lines +186 to +187
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion to leave the namespace on the type declaration. Would have saved this reviewer at least one (more) look at the top of the file (and later on when rereading this in N months) to confirm that this is, in fact, an MP queue. No biggie, but a suggestion.


def send(self, message: object) -> None:
self.queue.put((message, 0))
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 10 additions & 1 deletion parsl/tests/test_monitoring/test_fuzz_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import socket
import time
import zmq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,8 +49,16 @@ def test_row_counts():
s.connect((hub_address, hub_zmq_port))
s.sendall(b'fuzzing\r')

context = zmq.Context()
channel_timeout = 10000 # in milliseconds
hub_channel = context.socket(zmq.DEALER)
hub_channel.setsockopt(zmq.LINGER, 0)
hub_channel.set_hwm(0)
hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port))

# this will send a non-object down the DFK's existing ZMQ connection
parsl.dfk().monitoring._dfk_channel.send(b'FuzzyByte\rSTREAM')
hub_channel.send(b'FuzzyByte\rSTREAM')

# This following attack is commented out, because monitoring is not resilient
# to this.
Expand Down
Loading