Skip to content

Commit

Permalink
Fix to macos multiprocessing spawn and context issues
Browse files Browse the repository at this point in the history
  • Loading branch information
yongyanrao committed Jun 3, 2021
1 parent b657f65 commit 7e35079
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
from parsl.executors.high_throughput.probe import probe_addresses
if platform.system() != 'Darwin':
from multiprocessing import Queue as mpQueue
mpProcess = multiprocessing.Process
else:
from parsl.executors.high_throughput.mac_safe_queue import MacSafeQueue as mpQueue
multiprocessing.set_start_method('fork', force=True)
mpProcess = multiprocessing.get_context('fork').Process

from parsl.serialize import unpack_apply_message, serialize

Expand Down Expand Up @@ -361,15 +364,15 @@ def worker_watchdog(self, kill_event):
except KeyError:
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} was not busy when it died".format(worker_id))

p = multiprocessing.Process(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
p = mpProcess(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
self.procs[worker_id] = p
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} has been restarted".format(worker_id))
time.sleep(self.poll_period)
Expand All @@ -387,15 +390,15 @@ def start(self):

self.procs = {}
for worker_id in range(self.worker_count):
p = multiprocessing.Process(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
p = mpProcess(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress,
self.cpu_affinity
), name="HTEX-Worker-{}".format(worker_id))
p.start()
self.procs[worker_id] = p

Expand Down

0 comments on commit 7e35079

Please sign in to comment.