Skip to content

Commit

Permalink
Dispatch monitoring messages on tag type, instead of queue name (#2168)
Browse files Browse the repository at this point in the history
* Dispatch monitoring messages on tag type, instead of queue name

This means monitoring messages are handled based on their
declared tag type, rather than how they are received by the
database manager.

Specifically, RESOURCE_INFO messages are now handled more
like other messages.

This is groundwork for future PRs which will, for example,
allow remote tasks to deliver their resource messages over
htex's existing channels, rather than via UDP.
  • Loading branch information
benclifford authored Mar 31, 2022
1 parent d8d53ac commit 4488c0b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
30 changes: 23 additions & 7 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,21 +539,37 @@ def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kil
assert len(x) == 2
assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \
"_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0])
self.pending_priority_queue.put(cast(Any, x))
self._dispatch_to_internal(x)
elif queue_tag == 'resource':
assert x[0] == MessageType.RESOURCE_INFO, "_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue"
body = x[1]
assert len(body) == 3
self.pending_resource_queue.put(body[-1])
self._dispatch_to_internal(x)
elif queue_tag == 'node':
assert len(x) == 2, "expected message tuple to have exactly two elements"
assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue"

self.pending_node_queue.put(x[-1])
self._dispatch_to_internal(x)
elif queue_tag == "block":
self.pending_block_queue.put(x[-1])
self._dispatch_to_internal(x)
else:
raise RuntimeError(f"queue_tag {queue_tag} is unknown")
logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}")

def _dispatch_to_internal(self, x: Tuple) -> None:
if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]:
self.pending_priority_queue.put(cast(Any, x))
elif x[0] == MessageType.RESOURCE_INFO:
body = x[1]
assert len(body) == 3
self.pending_resource_queue.put(body[-1])
elif x[0] == MessageType.NODE_INFO:
assert len(x) == 2, "expected NODE_INFO tuple to have exactly two elements"

logger.info("Will put {} to pending node queue".format(x[1]))
self.pending_node_queue.put(x[1])
elif x[0] == MessageType.BLOCK_INFO:
logger.info("Will put {} to pending block queue".format(x[1]))
self.pending_block_queue.put(x[-1])
else:
logger.error("Discarding message of unknown type {}".format(x[0]))

def _update(self, table: str, columns: List[str], messages: List[Dict[str, Any]]) -> None:
try:
Expand Down
14 changes: 7 additions & 7 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from parsl.process_loggers import wrap_with_logs

from parsl.monitoring.message_type import MessageType
from typing import Any, Callable, Dict, List, Optional, Union
from typing import cast, Any, Callable, Dict, List, Optional, Union

_db_manager_excepts: Optional[Exception]

Expand Down Expand Up @@ -483,21 +483,21 @@ def start(self,
try:
msg = self.ic_channel.recv_pyobj()
self.logger.debug("Got ZMQ Message from interchange: {}".format(msg))

assert msg[0] == MessageType.NODE_INFO \
or msg[0] == MessageType.BLOCK_INFO, \
"IC Channel expects only NODE_INFO or BLOCK_INFO and cannot dispatch other message types"

assert isinstance(msg, tuple), "IC Channel expects only tuples, got {}".format(msg)
assert len(msg) >= 1, "IC Channel expects tuples of length at least 1, got {}".format(msg)
if msg[0] == MessageType.NODE_INFO:
assert len(msg) >= 1, "IC Channel expects NODE_INFO tuples of length at least 3, got {}".format(msg)
msg[2]['last_heartbeat'] = datetime.datetime.fromtimestamp(msg[2]['last_heartbeat'])
msg[2]['run_id'] = self.run_id
msg[2]['timestamp'] = msg[1]

# ((tag, dict), addr)
node_msg = ((msg[0], msg[2]), 0)
node_msgs.put(node_msg)
elif msg[0] == MessageType.RESOURCE_INFO:
resource_msgs.put(cast(Any, msg))
elif msg[0] == MessageType.BLOCK_INFO:
block_msgs.put((msg, 0))
block_msgs.put(cast(Any, (msg, 0)))
else:
self.logger.error(f"Discarding message from interchange with unknown type {msg[0].value}")
except zmq.Again:
Expand Down

0 comments on commit 4488c0b

Please sign in to comment.