diff --git a/bin/radical-utils-pwatch b/bin/radical-utils-pwatch index 31132ec57..135d88335 100755 --- a/bin/radical-utils-pwatch +++ b/bin/radical-utils-pwatch @@ -33,7 +33,7 @@ # Depending on what finishes first, the sleep 5 or the random sleep (and thus # the parent shell), the process watcher will kill the other shell. # -# +# # Example 2: # # sleep $((RANDOM % 10)) & RU_PW_PID1=$!; @@ -58,7 +58,7 @@ run_pwatch(){ # check RU_PW_PPID and RU_PW_CPID are set, and use them test -z "$RU_PW_PID1" || pid1=$RU_PW_PID1 test -z "$RU_PW_PID2" || pid2=$RU_PW_PID2 - + # use fallback pid1 (PPID) test -z "$pid1" && pid1=$PPID @@ -66,21 +66,21 @@ run_pwatch(){ # NOTE: we cannot use `test -z $pid2 && $ARGS & pid2=$!` - that would give # us the pid of the subshell, not of the command (at least in bash) if test -z "$pid2" - then + then # eval ensures that args could contain pipelines and I/O redirections, - # but we need to add the backgrounding to arga (if its not there, yet). + # but we need to add the backgrounding to args (if its not there, yet). # Don't background `eval` itself, that results in a subshell pid. ARGS=$(echo "$ARGS" | sed 's/[& ]\+$//') eval "$ARGS &" pid2=$! fi - + # ensure we have two PIDs test -z "$pid1" && echo 'missing process to watch (1)' && return 1 test -z "$pid2" && echo 'missing process to watch (2)' && return 2 # echo "pwatch [$pid1] [$pid2]" - + # -------------------------------------------------------------------------- while true do @@ -98,7 +98,7 @@ run_pwatch(){ kill $pid1 return 0 fi - + sleep 1 done } diff --git a/src/radical/utils/__init__.py b/src/radical/utils/__init__.py index 8e009ce9e..00ac1910c 100644 --- a/src/radical/utils/__init__.py +++ b/src/radical/utils/__init__.py @@ -25,7 +25,7 @@ from .object_cache import ObjectCache from .plugin_manager import PluginManager, PluginBase from .singleton import Singleton -from .heartbeat import Heartbeat +from .heartbeat import Heartbeat, PWatcher from .threads import is_main_thread, is_this_thread, cancel_main_thread from .threads import main_thread, this_thread, get_thread_name, gettid from .threads import set_cancellation_handler, unset_cancellation_handler diff --git a/src/radical/utils/heartbeat.py b/src/radical/utils/heartbeat.py index d179b5bd5..5feaf22bf 100644 --- a/src/radical/utils/heartbeat.py +++ b/src/radical/utils/heartbeat.py @@ -1,5 +1,6 @@ import os +import sys import time import pprint import signal @@ -263,5 +264,138 @@ def wait_startup(self, uids=None, timeout=None): self._log.debug_3('wait ok : %s', ok) +# ------------------------------------------------------------------------------ +# +class PWatcher(object): + + NOTHING = 'nothing' + SUICIDE = 'suicide' + KILLALL = 'killall' + RAMPAGE = 'rampage' + + # -------------------------------------------------------------------------- + # + def __init__(self, action=None, uid=None, log=None): + ''' + + This is a simple process monitor: once started it runs in a separate + thread and monitors all given process IDs (`self._watch_pid`). If + a process is found to have died, the watcher will invoke the given + action: + + - `nothing`: log event and do nothing else + - `suicide`: kill the curent process + - `killall`: kill all monitored pids + - `rampage`: both of the above (`suicide + killall`) + + The default action is `rampage`. + + The passed uid (default: `pwatcher`) is used for logging purposes only. + ''' + + self._action = action or self.RAMPAGE + self._uid = uid or 'pwatcher' + self._log = log or Logger(name=self._uid, ns='radical.utils') + self._pids = list() + self._lock = mt.Lock() + + self._log.debug_1('pwatcher create') + + self._thread = mt.Thread(target=self._watch) + self._thread.daemon = True + self._thread.start() + + + # -------------------------------------------------------------------------- + # + def _is_alive(self, pid): + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + + # -------------------------------------------------------------------------- + # + def _watch(self): + + self._log.debug_1('pwatcher started') + + while True: + + with self._lock: + + for pid in list(self._pids): + + if not self._is_alive(pid): + + self._log.warn('process %d died, exit', pid) + self._pids.remove(pid) + + if self._action == self.SUICIDE: self._suicide(pid) + elif self._action == self.KILLALL: self._killall(pid) + elif self._action == self.RAMPAGE: self._rampage(pid) + + time.sleep(0.05) + + + # -------------------------------------------------------------------------- + # + def watch(self, pid): + + self._log.debug('add pid %d to watchlist', pid) + + with self._lock: + self._pids.append(pid) + + + # -------------------------------------------------------------------------- + # + def unwatch(self, pid): + + self._log.debug('remove pid %d from watchlist', pid) + + with self._lock: + if pid in self._pids: + self._pids.remove(pid) + + + # -------------------------------------------------------------------------- + # + def _nothing(self, pid): + + self._log.debug("process %d's demise triggered, well, nothing", pid) + + + # -------------------------------------------------------------------------- + # + def _suicide(self, pid): + + self._log.debug("process %d's demise triggered suicide", pid) + os.kill(os.getpid(), signal.SIGKILL) + + + # -------------------------------------------------------------------------- + # + def _killall(self, pid): + + self._log.debug("process %d's demise triggered killall (%s)", + pid, self._pids) + + for pid in list(self._pids): + try : os.kill(pid, signal.SIGKILL) + except: pass + + + # -------------------------------------------------------------------------- + # + def _rampage(self, pid): + + self._killall(pid) + self._suicide(pid) + + # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/logger.py b/src/radical/utils/logger.py index f6b019029..6e892df4b 100644 --- a/src/radical/utils/logger.py +++ b/src/radical/utils/logger.py @@ -233,6 +233,9 @@ def __init__(self, name, ns=None, path=None, targets=None, level=None, settings. """ + if name is None: + raise ValueError('logger name must be specified and not `None`') + self._name = name self._ns = ns self._path = path diff --git a/src/radical/utils/zmq/bridge.py b/src/radical/utils/zmq/bridge.py index 24b554efb..ba7a9ffab 100644 --- a/src/radical/utils/zmq/bridge.py +++ b/src/radical/utils/zmq/bridge.py @@ -142,6 +142,21 @@ def start(self): self._log.info('started bridge %s', self._uid) + # -------------------------------------------------------------------------- + # + def wait(self, timeout=None): + ''' + wait for the bridge to terminate. If `timeout` is set, the call will + return after that many seconds, with a return value indicating whether + the bridge is still alive. + ''' + + self._bridge_thread.join(timeout=timeout) + + if timeout is not None: + return not self._bridge_thread.is_alive() + + # -------------------------------------------------------------------------- # @staticmethod diff --git a/tests/bin/test_pwatch.sh b/tests/bin/test_pwatch.sh new file mode 100755 index 000000000..dd7943144 --- /dev/null +++ b/tests/bin/test_pwatch.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +# test the shell pwatcher + +start=$(date +%s) + +sleep 1 & +export RU_PW_PID1=$! + +sleep 3 & +export RU_PW_PID2=$! + +./bin/radical-utils-pwatch + +stop=$(date +%s) +test "$(($stop-$start))" -gt 2 && exit 1 || exit 0 + diff --git a/tests/unittests/test_heartbeat.py b/tests/unittests/test_heartbeat.py index 86c4365bd..c24b97dd7 100755 --- a/tests/unittests/test_heartbeat.py +++ b/tests/unittests/test_heartbeat.py @@ -138,12 +138,159 @@ def proc(): except: pass +# ------------------------------------------------------------------------------ +# +def test_hb_pwatch_py(): + + queue = mp.Queue() + + # pid check + def is_alive(pid): + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + # watcher process + def _watcher(action): + + pwatcher = ru.PWatcher(action=action) + + # create two sleep processes to watch / handle + proc_1 = mp.Process(target=time.sleep, args=(0.1,)) + proc_2 = mp.Process(target=time.sleep, args=(0.4,)) + + proc_1.daemon = True + proc_2.daemon = True + + proc_1.start() + proc_2.start() + + pwatcher.watch(proc_1.pid) + pwatcher.watch(proc_2.pid) + + queue.put([os.getpid(), proc_1.pid, proc_2.pid]) + + # sleep for 1 seconds + start = time.time() + proc_1.join(timeout=1) + + remaining = max(0, 1 - (time.time() - start)) + proc_2.join(timeout=remaining) + + remaining = max(0, 1 - (time.time() - start)) + time.sleep(remaining) + + + # NOTE: we cannot use `test_proc.daemon = True` here, as the daemon flag + # damon procs cannot spawn children in Python :-/ + + # -------------------------------------------------------------------------- + # test mode `nothing` + test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.NOTHING]) + test_proc.start() + + pids = queue.get() + + # after 0.2 seconds, the watcher and second sleep should still be alive + time.sleep(0.2) + assert is_alive(pids[0]) + assert not is_alive(pids[1]) + assert is_alive(pids[2]) + + # after 0.5 seconds, only the watcher should still be alive + time.sleep(0.5) + assert is_alive(pids[0]) + assert not is_alive(pids[1]) + assert not is_alive(pids[2]) + + # after 1.1 seconds, the watcher should have exited + time.sleep(1.1) + test_proc.join(timeout=0.0) + assert not is_alive(pids[0]) + assert not is_alive(pids[1]) + assert not is_alive(pids[2]) + + # -------------------------------------------------------------------------- + # test mode `suicide` + test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.SUICIDE]) + test_proc.start() + + pids = queue.get() + + # after 0.2 seconds, only second sleep should still be alive + time.sleep(0.4) + test_proc.join(timeout=0.1) + assert not is_alive(pids[0]) + assert not is_alive(pids[1]) + assert is_alive(pids[2]) + + # after 0.5 seconds, none of the processes should be alive + time.sleep(0.5) + assert not is_alive(pids[0]) + assert not is_alive(pids[1]) + assert not is_alive(pids[2]) + + + # -------------------------------------------------------------------------- + # test mode `killall` + test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.KILLALL]) + test_proc.start() + + pids = queue.get() + + # after 0.2 seconds, only second sleep should still be alive + time.sleep(0.4) + test_proc.join(timeout=0.1) + assert is_alive(pids[0]) + assert not is_alive(pids[1]) + assert not is_alive(pids[2]) + + # after 0.5 seconds, none of the processes should be alive + time.sleep(0.5) + test_proc.join(timeout=0.1) + assert not is_alive(pids[0]) + assert not is_alive(pids[1]) + assert not is_alive(pids[2]) + + + # -------------------------------------------------------------------------- + # test mode `rampage` + test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.RAMPAGE]) + test_proc.start() + + pids = queue.get() + + # after 0.2 seconds, the first sleep dies and no process should be alive + time.sleep(0.2) + test_proc.join(timeout=0.1) + assert not is_alive(pids[0]) + assert not is_alive(pids[1]) + assert not is_alive(pids[2]) + + +# ------------------------------------------------------------------------------ +# +def test_hb_pwatch_sh(): + + pwd = os.path.dirname(__file__) + script = '%s/../bin/test_pwatch.sh' % pwd + + out, err, ret = ru.sh_callout('%s -h' % script) + + assert ret == 0, [out, err, ret] + + # ------------------------------------------------------------------------------ # run tests if called directly if __name__ == "__main__": - # test_hb_default() + test_hb_default() test_hb_uid() + test_hb_pwatch_py() + test_hb_pwatch_sh() # ------------------------------------------------------------------------------