Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/simple hb #420

Merged
merged 7 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions bin/radical-utils-pwatch
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# Depending on what finishes first, the sleep 5 or the random sleep (and thus
# the parent shell), the process watcher will kill the other shell.
#
#
#
# Example 2:
#
# sleep $((RANDOM % 10)) & RU_PW_PID1=$!;
Expand All @@ -58,29 +58,29 @@ run_pwatch(){
# check RU_PW_PPID and RU_PW_CPID are set, and use them
test -z "$RU_PW_PID1" || pid1=$RU_PW_PID1
test -z "$RU_PW_PID2" || pid2=$RU_PW_PID2

# use fallback pid1 (PPID)
test -z "$pid1" && pid1=$PPID

# use fallback forpid2: cmd subshell
# NOTE: we cannot use `test -z $pid2 && $ARGS & pid2=$!` - that would give
# us the pid of the subshell, not of the command (at least in bash)
if test -z "$pid2"
then
then
# eval ensures that args could contain pipelines and I/O redirections,
# but we need to add the backgrounding to arga (if its not there, yet).
# but we need to add the backgrounding to args (if its not there, yet).
# Don't background `eval` itself, that results in a subshell pid.
ARGS=$(echo "$ARGS" | sed 's/[& ]\+$//')
eval "$ARGS &"
pid2=$!
fi

# ensure we have two PIDs
test -z "$pid1" && echo 'missing process to watch (1)' && return 1
test -z "$pid2" && echo 'missing process to watch (2)' && return 2

# echo "pwatch [$pid1] [$pid2]"

# --------------------------------------------------------------------------
while true
do
Expand All @@ -98,7 +98,7 @@ run_pwatch(){
kill $pid1
return 0
fi

sleep 1
done
}
Expand Down
2 changes: 1 addition & 1 deletion src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .object_cache import ObjectCache
from .plugin_manager import PluginManager, PluginBase
from .singleton import Singleton
from .heartbeat import Heartbeat
from .heartbeat import Heartbeat, PWatcher
from .threads import is_main_thread, is_this_thread, cancel_main_thread
from .threads import main_thread, this_thread, get_thread_name, gettid
from .threads import set_cancellation_handler, unset_cancellation_handler
Expand Down
132 changes: 132 additions & 0 deletions src/radical/utils/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import os
import sys
import time
import pprint
import signal
Expand Down Expand Up @@ -263,5 +264,136 @@ def wait_startup(self, uids=None, timeout=None):
self._log.debug_3('wait ok : %s', ok)


# ------------------------------------------------------------------------------
#
class PWatcher(object):

NOTHING = 'nothing'
SUICIDE = 'suicide'
KILLALL = 'killall'
RAMPAGE = 'rampage'

# --------------------------------------------------------------------------
#
def __init__(self, action=None, uid=None, log=None):
'''

This is a simple process monitor: once started it runs in a separate
thread and monitors all given process IDs (`self._watch_pid`). If
a process is found to have died, the watcher will invoke the given
action:

- `nothing`: log event and do nothing else
- `suicide`: kill the curent process
- `killall`: kill all monitored pids
- `rampage`: both of the above (`suicide + killall`)

The default action is `rampage`.

The passed uid (default: `pwatcher`) is used for logging purposes only.
'''

self._action = action or self.RAMPAGE
self._uid = uid or 'pwatcher'
self._log = log or Logger(name=uid, ns='radical.utils')
self._pids = list()
self._lock = mt.Lock()

self._log.debug_1('pwatcher create')

self._thread = mt.Thread(target=self._watch)
self._thread.daemon = True
self._thread.start()


# --------------------------------------------------------------------------
#
def _is_alive(self, pid):
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True


# --------------------------------------------------------------------------
#
def _watch(self):

self._log.debug_1('pwatcher started')

while True:

with self._lock:

for pid in self._pids:
mtitov marked this conversation as resolved.
Show resolved Hide resolved

if not self._is_alive(pid):

self._log.warn('process %d died, exit', pid)
self._pids.remove(pid)

if self._action == self.SUICIDE: self._suicide(pid)
elif self._action == self.KILLALL: self._killall(pid)
elif self._action == self.RAMPAGE: self._rampage(pid)

time.sleep(0.05)


# --------------------------------------------------------------------------
#
def watch(self, pid):

self._log.debug('add pid %d to watchlist', pid)

with self._lock:
self._pids.append(pid)


# --------------------------------------------------------------------------
#
def unwatch(self, pid):

self._log.debug('remove pid %d from watchlist', pid)

with self._lock:
self._pids.remove(pid)
mtitov marked this conversation as resolved.
Show resolved Hide resolved


# --------------------------------------------------------------------------
#
def _nothing(self, pid):

self._log.debug("process %d's demise triggered, well, nothing", pid)


# --------------------------------------------------------------------------
#
def _suicide(self, pid):

self._log.debug("process %d's demise triggered suicide", pid)
os.kill(os.getpid(), signal.SIGKILL)


# --------------------------------------------------------------------------
#
def _killall(self, pid):

self._log.debug("process %d's demise triggered killall (%s)",
pid, self._pids)

for pid in self._pids:
os.kill(pid, signal.SIGKILL)
mtitov marked this conversation as resolved.
Show resolved Hide resolved


# --------------------------------------------------------------------------
#
def _rampage(self, pid):

self._killall(pid)
self._suicide(pid)


# ------------------------------------------------------------------------------

15 changes: 15 additions & 0 deletions src/radical/utils/zmq/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ def start(self):
self._log.info('started bridge %s', self._uid)


# --------------------------------------------------------------------------
#
def wait(self, timeout=None):
'''
wait for the bridge to terminate. If `timeout` is set, the call will
return after that many seconds, with a return value indicating whether
the bridge is still alive.
'''

self._bridge_thread.join(timeout=timeout)

if timeout is not None:
return not self._bridge_thread.is_alive()


# --------------------------------------------------------------------------
#
@staticmethod
Expand Down
17 changes: 17 additions & 0 deletions tests/bin/test_pwatch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/sh

# test the shell pwatcher

start=$(date +%s)

sleep 1 &
export RU_PW_PID1=$!

sleep 3 &
export RU_PW_PID2=$!

./bin/radical-utils-pwatch

stop=$(date +%s)
test "$(($stop-$start))" -gt 2 && exit 1 || exit 0

Loading
Loading