Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Oct 11, 2024
1 parent 1b35b9c commit 42f5041
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 9 deletions.
14 changes: 7 additions & 7 deletions bin/radical-utils-pwatch
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# Depending on what finishes first, the sleep 5 or the random sleep (and thus
# the parent shell), the process watcher will kill the other shell.
#
#
#
# Example 2:
#
# sleep $((RANDOM % 10)) & RU_PW_PID1=$!;
Expand All @@ -58,29 +58,29 @@ run_pwatch(){
# check RU_PW_PPID and RU_PW_CPID are set, and use them
test -z "$RU_PW_PID1" || pid1=$RU_PW_PID1
test -z "$RU_PW_PID2" || pid2=$RU_PW_PID2

# use fallback pid1 (PPID)
test -z "$pid1" && pid1=$PPID

# use fallback forpid2: cmd subshell
# NOTE: we cannot use `test -z $pid2 && $ARGS & pid2=$!` - that would give
# us the pid of the subshell, not of the command (at least in bash)
if test -z "$pid2"
then
then
# eval ensures that args could contain pipelines and I/O redirections,
# but we need to add the backgrounding to arga (if its not there, yet).
# but we need to add the backgrounding to args (if its not there, yet).
# Don't background `eval` itself, that results in a subshell pid.
ARGS=$(echo "$ARGS" | sed 's/[& ]\+$//')
eval "$ARGS &"
pid2=$!
fi

# ensure we have two PIDs
test -z "$pid1" && echo 'missing process to watch (1)' && return 1
test -z "$pid2" && echo 'missing process to watch (2)' && return 2

# echo "pwatch [$pid1] [$pid2]"

# --------------------------------------------------------------------------
while true
do
Expand All @@ -98,7 +98,7 @@ run_pwatch(){
kill $pid1
return 0
fi

sleep 1
done
}
Expand Down
2 changes: 1 addition & 1 deletion src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .object_cache import ObjectCache
from .plugin_manager import PluginManager, PluginBase
from .singleton import Singleton
from .heartbeat import Heartbeat
from .heartbeat import Heartbeat, PWatcher
from .threads import is_main_thread, is_this_thread, cancel_main_thread
from .threads import main_thread, this_thread, get_thread_name, gettid
from .threads import set_cancellation_handler, unset_cancellation_handler
Expand Down
132 changes: 132 additions & 0 deletions src/radical/utils/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import os
import sys
import time
import pprint
import signal
Expand Down Expand Up @@ -263,5 +264,136 @@ def wait_startup(self, uids=None, timeout=None):
self._log.debug_3('wait ok : %s', ok)


# ------------------------------------------------------------------------------
#
class PWatcher(object):

NOTHING = 'nothing'
SUICIDE = 'suicide'
KILLALL = 'killall'
RAMPAGE = 'rampage'

# --------------------------------------------------------------------------
#
def __init__(self, action=None, uid=None, log=None):
'''
This is a simple process monitor: once started it runs in a separate
thread and monitors all given process IDs (`self._watch_pid`). If
a process is found to have died, the watcher will invoke the given
action:
- `nothing`: log event and do nothing else
- `suicide`: kill the curent process
- `killall`: kill all monitored pids
- `rampage`: both of the above (`suicide + killall`)
The default action is `rampage`.
The passed uid (default: `pwatcher`) is used for logging purposes only.
'''

self._action = action or self.RAMPAGE
self._uid = uid or 'pwatcher'
self._log = log or Logger(name=uid, ns='radical.utils')
self._pids = list()
self._lock = mt.Lock()

self._log.debug_1('pwatcher create')

self._thread = mt.Thread(target=self._watch)
self._thread.daemon = True
self._thread.start()


# --------------------------------------------------------------------------
#
def _is_alive(self, pid):
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True


# --------------------------------------------------------------------------
#
def _watch(self):

self._log.debug_1('pwatcher started')

while True:

with self._lock:

for pid in self._pids:

if not self._is_alive(pid):

self._log.warn('process %d died, exit', pid)
self._pids.remove(pid)

if self._action == self.SUICIDE: self._suicide(pid)
elif self._action == self.KILLALL: self._killall(pid)
elif self._action == self.RAMPAGE: self._rampage(pid)

time.sleep(0.05)


# --------------------------------------------------------------------------
#
def watch(self, pid):

self._log.debug('add pid %d to watchlist', pid)

with self._lock:
self._pids.append(pid)


# --------------------------------------------------------------------------
#
def unwatch(self, pid):

self._log.debug('remove pid %d from watchlist', pid)

with self._lock:
self._pids.remove(pid)


# --------------------------------------------------------------------------
#
def _nothing(self, pid):

self._log.debug("process %d's demise triggered, well, nothing", pid)


# --------------------------------------------------------------------------
#
def _suicide(self, pid):

self._log.debug("process %d's demise triggered suicide", pid)
os.kill(os.getpid(), signal.SIGKILL)


# --------------------------------------------------------------------------
#
def _killall(self, pid):

self._log.debug("process %d's demise triggered killall (%s)",
pid, self._pids)

for pid in self._pids:
os.kill(pid, signal.SIGKILL)


# --------------------------------------------------------------------------
#
def _rampage(self, pid):

self._killall(pid)
self._suicide(pid)


# ------------------------------------------------------------------------------

17 changes: 17 additions & 0 deletions tests/bin/test_pwatch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/sh

# test the shell pwatcher

start=$(date +%s)

sleep 1 &
export RU_PW_PID1=$!

sleep 3 &
export RU_PW_PID2=$!

./bin/radical-utils-pwatch

stop=$(date +%s)
test "$(($stop-$start))" -gt 2 && exit 1 || exit 0

148 changes: 147 additions & 1 deletion tests/unittests/test_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,158 @@ def proc():
except: pass


# ------------------------------------------------------------------------------
#
def test_hb_pwatch_py():

queue = mp.Queue()

# pid check
def is_alive(pid):
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True

# watcher process
def _watcher(action):

pwatcher = ru.PWatcher(action=action)

# create two sleep processes to watch / handle
proc_1 = mp.Process(target=time.sleep, args=(0.1,))
proc_2 = mp.Process(target=time.sleep, args=(0.4,))

proc_1.daemon = True
proc_2.daemon = True

proc_1.start()
proc_2.start()

pwatcher.watch(proc_1.pid)
pwatcher.watch(proc_2.pid)

queue.put([os.getpid(), proc_1.pid, proc_2.pid])

# sleep for 1 seconds
start = time.time()
proc_1.join(timeout=1)

remaining = max(0, 1 - (time.time() - start))
proc_2.join(timeout=remaining)

remaining = max(0, 1 - (time.time() - start))
time.sleep(remaining)


# NOTE: we cannot use `test_proc.daemon = True` here, as the daemon flag
# damon procs cannot spawn childrin in Python :-/

# --------------------------------------------------------------------------
# test mode `nothing`
test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.NOTHING])
test_proc.start()

pids = queue.get()

# after 0.2 seconds, the watcher and second sleep should still be alive
time.sleep(0.2)
assert is_alive(pids[0])
assert not is_alive(pids[1])
assert is_alive(pids[2])

# after 0.5 seconds, only the watcher should still be alive
time.sleep(0.5)
assert is_alive(pids[0])
assert not is_alive(pids[1])
assert not is_alive(pids[2])

# after 1.1 seconds, the watcher should have exited
time.sleep(1.1)
test_proc.join(timeout=0.0)
assert not is_alive(pids[0])
assert not is_alive(pids[1])
assert not is_alive(pids[2])

# --------------------------------------------------------------------------
# test mode `suicide`
test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.SUICIDE])
test_proc.start()

pids = queue.get()

# after 0.2 seconds, only second sleep should still be alive
time.sleep(0.4)
test_proc.join(timeout=0.1)
assert not is_alive(pids[0])
assert not is_alive(pids[1])
assert is_alive(pids[2])

# after 0.5 seconds, none of the processes should be alive
time.sleep(0.5)
assert not is_alive(pids[0])
assert not is_alive(pids[1])
assert not is_alive(pids[2])


# --------------------------------------------------------------------------
# test mode `killall`
test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.KILLALL])
test_proc.start()

pids = queue.get()

# after 0.2 seconds, only second sleep should still be alive
time.sleep(0.4)
test_proc.join(timeout=0.1)
assert is_alive(pids[0])
assert not is_alive(pids[1])
assert not is_alive(pids[2])

# after 0.5 seconds, none of the processes should be alive
time.sleep(0.5)
test_proc.join(timeout=0.1)
assert not is_alive(pids[0])
assert not is_alive(pids[1])
assert not is_alive(pids[2])


# --------------------------------------------------------------------------
# test mode `rampage`
test_proc = mp.Process(target=_watcher, args=[ru.PWatcher.RAMPAGE])
test_proc.start()

pids = queue.get()

# after 0.2 seconds, only second sleep should still be alive
time.sleep(0.4)
test_proc.join(timeout=0.1)
assert not is_alive(pids[0])
assert not is_alive(pids[1])
assert not is_alive(pids[2])

# ------------------------------------------------------------------------------
#
def test_hb_pwatch_sh():

pwd = os.path.dirname(__file__)
script = '%s/../bin/test_pwatch.sh' % pwd

out, err, ret = ru.sh_callout('%s -h' % script)

assert ret == 0, [out, err, ret]


# ------------------------------------------------------------------------------
# run tests if called directly
if __name__ == "__main__":

# test_hb_default()
test_hb_default()
test_hb_uid()
test_hb_pwatch_py()
test_hb_pwatch_sh()


# ------------------------------------------------------------------------------
Expand Down

0 comments on commit 42f5041

Please sign in to comment.