Skip to content

Commit

Permalink
Enable HTEX worker to check manager is alive (#2970)
Browse files Browse the repository at this point in the history
The HTEX worker will periodically check that their parent manager
process is still alive. If the manager process dies without first
sending the worker a SIGTERM, the worker will shut itself down.
  • Loading branch information
rjmello authored Dec 11, 2023
1 parent 24ae950 commit 982fdca
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 5 deletions.
33 changes: 28 additions & 5 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ def _start_worker(self, worker_id: int):
self.cpu_affinity,
self.available_accelerators[worker_id] if self.accelerators_available else None,
self.block_id,
self.heartbeat_period,
os.getpid(),
args.logdir,
args.debug,
),
Expand Down Expand Up @@ -541,6 +543,8 @@ def worker(
cpu_affinity: Union[str, bool],
accelerator: Optional[str],
block_id: str,
task_queue_timeout: int,
manager_pid: int,
logdir: str,
debug: bool,
):
Expand Down Expand Up @@ -611,18 +615,37 @@ def worker(

logger.info(f'Pinned worker to accelerator: {accelerator}')

while True:
with ready_worker_count.get_lock():
ready_worker_count.value += 1
def manager_is_alive():
try:
# This does not kill the process, but instead raises
# an exception if the process doesn't exist
os.kill(manager_pid, 0)
except OSError:
logger.critical(f"Manager ({manager_pid}) died; worker {worker_id} shutting down")
return False
else:
return True

worker_enqueued = False
while manager_is_alive():
if not worker_enqueued:
with ready_worker_count.get_lock():
ready_worker_count.value += 1
worker_enqueued = True

try:
# The worker will receive {'task_id':<tid>, 'buffer':<buf>}
req = task_queue.get(timeout=task_queue_timeout)
except queue.Empty:
continue

# The worker will receive {'task_id':<tid>, 'buffer':<buf>}
req = task_queue.get()
tasks_in_progress[worker_id] = req
tid = req['task_id']
logger.info("Received executor task {}".format(tid))

with ready_worker_count.get_lock():
ready_worker_count.value -= 1
worker_enqueued = False

try:
result = execute_task(req['buffer'])
Expand Down
52 changes: 52 additions & 0 deletions parsl/tests/test_error_handling/test_htex_manager_failure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import signal
import time

import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.htex_local import fresh_config


@pytest.fixture(autouse=True, scope="function")
def load_config():
config = fresh_config()
config.executors[0].poll_period = 1
config.executors[0].max_workers = 1
config.executors[0].heartbeat_period = 1

parsl.load(config)
yield

parsl.dfk().cleanup()
parsl.clear()


@python_app
def get_worker_pid():
import os
return os.getpid()


@python_app
def kill_manager(sig: int):
import os
os.kill(os.getppid(), sig)


@pytest.mark.local
@pytest.mark.parametrize("sig", [signal.SIGTERM, signal.SIGKILL])
def test_htex_manager_failure_worker_shutdown(sig: int):
"""Ensure that HTEX workers shut down when the Manager process dies."""
worker_pid = get_worker_pid().result()

kill_manager(sig)

with pytest.raises(OSError):
end = time.monotonic() + 5
while time.monotonic() < end:
# Raises an exception if the process
# does not exist
os.kill(worker_pid, 0)
time.sleep(.1)

0 comments on commit 982fdca

Please sign in to comment.