Skip to content

Commit

Permalink
Fix to macos multiprocessing spawn and context issues (#2076)
Browse files Browse the repository at this point in the history
* Fix to macos multiprocessing spawn and context issues

* Further fix to make fork setting be effective for the entire parsl
package, to avoid any explicit handling from the end user side

* Wrap mac process into a class

* Fix for mypy error

Co-authored-by: Ben Clifford <[email protected]>
  • Loading branch information
yongyanrao and benclifford authored Jun 29, 2021
1 parent fbdeb59 commit e87a0aa
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
4 changes: 4 additions & 0 deletions parsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@

from parsl.dataflow.dflow import DataFlowKernel, DataFlowKernelLoader

import multiprocessing
if platform.system() == 'Darwin':
multiprocessing.set_start_method('fork', force=True)

__author__ = 'The Parsl Team'
__version__ = VERSION

Expand Down
10 changes: 10 additions & 0 deletions parsl/executors/high_throughput/mac_safe_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import multiprocessing
from typing import Any

ForkProcess: Any = multiprocessing.get_context('fork').Process


class MacSafeProcess(ForkProcess):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
38 changes: 20 additions & 18 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
from parsl.executors.high_throughput.probe import probe_addresses
if platform.system() != 'Darwin':
from multiprocessing import Queue as mpQueue
from multiprocessing import Process as mpProcess
else:
from parsl.executors.high_throughput.mac_safe_queue import MacSafeQueue as mpQueue
from parsl.executors.high_throughput.mac_safe_process import MacSafeProcess as mpProcess

from parsl.serialize import unpack_apply_message, serialize

Expand Down Expand Up @@ -361,15 +363,15 @@ def worker_watchdog(self, kill_event):
except KeyError:
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} was not busy when it died".format(worker_id))

p = multiprocessing.Process(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
p = mpProcess(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
self.procs[worker_id] = p
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} has been restarted".format(worker_id))
time.sleep(self.poll_period)
Expand All @@ -387,15 +389,15 @@ def start(self):

self.procs = {}
for worker_id in range(self.worker_count):
p = multiprocessing.Process(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
p = mpProcess(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
p.start()
self.procs[worker_id] = p

Expand Down

0 comments on commit e87a0aa

Please sign in to comment.