Skip to content

Commit

Permalink
work for #37
Browse files Browse the repository at this point in the history
  • Loading branch information
donaldcampbelljr committed Jun 10, 2024
1 parent ffc569e commit 463aeb2
Showing 1 changed file with 92 additions and 1 deletion.
93 changes: 92 additions & 1 deletion looper/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import os
import subprocess
import signal
import psutil
import sys
import time
import yaml
from math import ceil
Expand Down Expand Up @@ -189,6 +192,7 @@ def __init__(
the project level, rather that on the sample level)
"""
super(SubmissionConductor, self).__init__()

self.collate = collate
self.section_key = PROJECT_PL_KEY if self.collate else SAMPLE_PL_KEY
self.pl_iface = pipeline_interface
Expand All @@ -210,6 +214,7 @@ def __init__(
self._curr_size = 0
self._failed_sample_names = []
self._curr_skip_pool = []
self.process_id = None # this is used for a submitted subprocess

if self.extra_pipe_args:
_LOGGER.debug(
Expand Down Expand Up @@ -392,6 +397,10 @@ def submit(self, force=False):
not for dry run)
"""
submitted = False

# Override signal handler so that Ctrl+C can be used to gracefully terminate child process
signal.signal(signal.SIGINT, self._signal_int_handler)

if not self._pool:
_LOGGER.debug("No submission (no pooled samples): %s", self.pl_name)
# submitted = False
Expand Down Expand Up @@ -421,7 +430,9 @@ def submit(self, force=False):
# Capture submission command return value so that we can
# intercept and report basic submission failures; #167
try:
subprocess.check_call(submission_command, shell=True)
process = subprocess.Popen(submission_command, shell=True)
self.process_id = process.pid
return_code = process.wait()
except subprocess.CalledProcessError:
fails = (
"" if self.collate else [s.sample_name for s in self._samples]
Expand Down Expand Up @@ -489,6 +500,86 @@ def _sample_lump_name(self, pool):
# name concordant with 1-based, not 0-based indexing.
return "lump{}".format(self._num_total_job_submissions + 1)

def _signal_int_handler(self, signal, frame):
"""
For catching interrupt (Ctrl +C) signals. Fails gracefully.
"""
signal_type = "SIGINT"
self._generic_signal_handler(signal_type)

def _generic_signal_handler(self, signal_type):
"""
Function for handling both SIGTERM and SIGINT
"""
message = "Received " + signal_type + ". Failing gracefully..."
_LOGGER.warning(msg=message)

self._terminate_current_subprocess()

sys.exit(1)

def _terminate_current_subprocess(self):

def pskill(proc_pid, sig=signal.SIGINT):
parent_process = psutil.Process(proc_pid)
for child_proc in parent_process.children(recursive=True):
child_proc.send_signal(sig)
parent_process.send_signal(sig)

if self.process_id is None:
return

# Gently wait for the subprocess before attempting to kill it
sys.stdout.flush()
still_running = self._attend_process(psutil.Process(self.process_id), 0)
sleeptime = 0.25
time_waiting = 0

while still_running and time_waiting < 3:
try:
if time_waiting > 2:
pskill(self.process_id, signal.SIGKILL)
elif time_waiting > 1:
pskill(self.process_id, signal.SIGTERM)
else:
pskill(self.process_id, signal.SIGINT)

except OSError:
# This would happen if the child process ended between the check
# and the next kill step
still_running = False
time_waiting = time_waiting + sleeptime

# Now see if it's still running
time_waiting = time_waiting + sleeptime
if not self._attend_process(psutil.Process(self.process_id), sleeptime):
still_running = False

if still_running:
_LOGGER.warning(f"Unable to halt child process: {self.process_id}")
else:
if time_waiting > 0:
note = f"terminated after {time_waiting} sec"
else:
note = "was already terminated"
_LOGGER.warning(msg=f"Child process {self.process_id} {note}.")

def _attend_process(self, proc, sleeptime):
"""
Waits on a process for a given time to see if it finishes, returns True
if it's still running after the given time or False as soon as it
returns.
:param psutil.Process proc: Process object opened by psutil.Popen()
:param float sleeptime: Time to wait
:return bool: True if process is still running; otherwise false
"""
try:
proc.wait(timeout=int(sleeptime))
except psutil.TimeoutExpired:
return True
return False

def _jobname(self, pool):
"""Create the name for a job submission."""
return "{}_{}".format(self.pl_iface.pipeline_name, self._sample_lump_name(pool))
Expand Down

0 comments on commit 463aeb2

Please sign in to comment.