Skip to content

Commit

Permalink
Factor radio selection in monitoring worker-side code (#3432)
Browse files Browse the repository at this point in the history
Co-authored-by: Sicheng Zhou <[email protected]>
  • Loading branch information
benclifford and ClaudiaCumberbatch authored May 16, 2024
1 parent 441a369 commit ccea519
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
return (wrapped, args, new_kwargs)


def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio:
radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
return radio


@wrap_with_logs
def send_first_message(try_id: int,
task_id: int,
Expand All @@ -122,18 +138,7 @@ def send_first_last_message(try_id: int,
import platform
import os

radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir)

msg = (MessageType.RESOURCE_INFO,
{'run_id': run_id,
Expand Down Expand Up @@ -178,18 +183,7 @@ def monitor(pid: int,

setproctitle("parsl: task resource monitor")

radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir)

logging.debug("start of monitor")

Expand Down

0 comments on commit ccea519

Please sign in to comment.