diff --git a/parsl/__init__.py b/parsl/__init__.py index ab42eaa05a..191c75d467 100644 --- a/parsl/__init__.py +++ b/parsl/__init__.py @@ -33,6 +33,10 @@ from parsl.dataflow.dflow import DataFlowKernel, DataFlowKernelLoader +import multiprocessing +if platform.system() == 'Darwin': + multiprocessing.set_start_method('fork', force=True) + __author__ = 'The Parsl Team' __version__ = VERSION diff --git a/parsl/executors/high_throughput/mac_safe_process.py b/parsl/executors/high_throughput/mac_safe_process.py new file mode 100644 index 0000000000..9563b5255e --- /dev/null +++ b/parsl/executors/high_throughput/mac_safe_process.py @@ -0,0 +1,10 @@ +import multiprocessing +from typing import Any + +ForkProcess: Any = multiprocessing.get_context('fork').Process + + +class MacSafeProcess(ForkProcess): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 7e6493b142..31e33bd401 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -24,8 +24,10 @@ from parsl.executors.high_throughput.probe import probe_addresses if platform.system() != 'Darwin': from multiprocessing import Queue as mpQueue + from multiprocessing import Process as mpProcess else: from parsl.executors.high_throughput.mac_safe_queue import MacSafeQueue as mpQueue + from parsl.executors.high_throughput.mac_safe_process import MacSafeProcess as mpProcess from parsl.serialize import unpack_apply_message, serialize @@ -361,15 +363,15 @@ def worker_watchdog(self, kill_event): except KeyError: logger.info("[WORKER_WATCHDOG_THREAD] Worker {} was not busy when it died".format(worker_id)) - p = multiprocessing.Process(target=worker, args=(worker_id, - self.uid, - self.worker_count, - self.pending_task_queue, - self.pending_result_queue, - self.ready_worker_queue, - self._tasks_in_progress, - self.cpu_affinity - ), name="HTEX-Worker-{}".format(worker_id)) + p = mpProcess(target=worker, args=(worker_id, + self.uid, + self.worker_count, + self.pending_task_queue, + self.pending_result_queue, + self.ready_worker_queue, + self._tasks_in_progress, + self.cpu_affinity + ), name="HTEX-Worker-{}".format(worker_id)) self.procs[worker_id] = p logger.info("[WORKER_WATCHDOG_THREAD] Worker {} has been restarted".format(worker_id)) time.sleep(self.poll_period) @@ -387,15 +389,15 @@ def start(self): self.procs = {} for worker_id in range(self.worker_count): - p = multiprocessing.Process(target=worker, args=(worker_id, - self.uid, - self.worker_count, - self.pending_task_queue, - self.pending_result_queue, - self.ready_worker_queue, - self._tasks_in_progress, - self.cpu_affinity - ), name="HTEX-Worker-{}".format(worker_id)) + p = mpProcess(target=worker, args=(worker_id, + self.uid, + self.worker_count, + self.pending_task_queue, + self.pending_result_queue, + self.ready_worker_queue, + self._tasks_in_progress, + self.cpu_affinity + ), name="HTEX-Worker-{}".format(worker_id)) p.start() self.procs[worker_id] = p