diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 8737ccca67..190d8fc153 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -24,8 +24,11 @@ from parsl.executors.high_throughput.probe import probe_addresses if platform.system() != 'Darwin': from multiprocessing import Queue as mpQueue + mpProcess = multiprocessing.Process else: from parsl.executors.high_throughput.mac_safe_queue import MacSafeQueue as mpQueue + multiprocessing.set_start_method('fork', force=True) + mpProcess = multiprocessing.get_context('fork').Process from parsl.serialize import unpack_apply_message, serialize @@ -361,15 +364,15 @@ def worker_watchdog(self, kill_event): except KeyError: logger.info("[WORKER_WATCHDOG_THREAD] Worker {} was not busy when it died".format(worker_id)) - p = multiprocessing.Process(target=worker, args=(worker_id, - self.uid, - self.worker_count, - self.pending_task_queue, - self.pending_result_queue, - self.ready_worker_queue, - self._tasks_in_progress, - self.cpu_affinity - ), name="HTEX-Worker-{}".format(worker_id)) + p = mpProcess(target=worker, args=(worker_id, + self.uid, + self.worker_count, + self.pending_task_queue, + self.pending_result_queue, + self.ready_worker_queue, + self._tasks_in_progress, + self.cpu_affinity + ), name="HTEX-Worker-{}".format(worker_id)) self.procs[worker_id] = p logger.info("[WORKER_WATCHDOG_THREAD] Worker {} has been restarted".format(worker_id)) time.sleep(self.poll_period) @@ -387,15 +390,15 @@ def start(self): self.procs = {} for worker_id in range(self.worker_count): - p = multiprocessing.Process(target=worker, args=(worker_id, - self.uid, - self.worker_count, - self.pending_task_queue, - self.pending_result_queue, - self.ready_worker_queue, - self._tasks_in_progress, - self.cpu_affinity - ), name="HTEX-Worker-{}".format(worker_id)) + p = mpProcess(target=worker, args=(worker_id, + self.uid, + self.worker_count, + self.pending_task_queue, + self.pending_result_queue, + self.ready_worker_queue, + self._tasks_in_progress, + self.cpu_affinity + ), name="HTEX-Worker-{}".format(worker_id)) p.start() self.procs[worker_id] = p