Skip to content

Commit

Permalink
Switch to stdin for config object transmission
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Jun 12, 2024
1 parent e7d18aa commit c9a7c2c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
19 changes: 11 additions & 8 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import logging
import math
import pickle
Expand All @@ -9,7 +8,7 @@
from collections import defaultdict
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import IO, Callable, Dict, List, Optional, Sequence, Tuple, Union

import typeguard

Expand Down Expand Up @@ -543,18 +542,22 @@ def _start_local_interchange_process(self) -> None:
"cert_dir": self.cert_dir,
}

encoded = base64.b64encode(pickle.dumps(interchange_config))
config_pickle = pickle.dumps(interchange_config)

cmd: List[bytes] = [b"interchange.py",
encoded
]
self.interchange_proc = subprocess.Popen(cmd)
self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE)
stdin = self.interchange_proc.stdin
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"

logger.debug("Popened interchange process. Writing config object")
stdin.write(config_pickle)
stdin.flush()
logger.debug("Sent config object. Requesting worker ports")
try:
(self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120)
except CommandClientTimeoutError:
logger.error("Interchange has not completed initialization in 120s. Aborting")
logger.error("Interchange has not completed initialization. Aborting")
raise Exception("Interchange failed to start")
logger.debug("Got worker ports")

def _start_queue_management_thread(self):
"""Method to start the management thread as a daemon.
Expand Down
3 changes: 1 addition & 2 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
import base64
import datetime
import json
import logging
Expand Down Expand Up @@ -676,7 +675,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
if __name__ == "__main__":
setproctitle("parsl: HTEX interchange")

config = pickle.loads(base64.b64decode(sys.argv[1]))
config = pickle.load(sys.stdin.buffer)

ic = Interchange(**config)
ic.start()

0 comments on commit c9a7c2c

Please sign in to comment.