Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Slurm interactions more #4869

Merged
merged 8 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ levels in toil are based on priority from the logging module:
the waiting period. Only works for grid engine batch
systems such as gridengine, htcondor, torque, slurm,
and lsf.
--statePollingTimeout STATEPOLLINGTIMEOUT
Time, in seconds, to retry against a broken scheduler.
Only works for grid engine batch systems such as
gridengine, htcondor, torque, slurm, and lsf.
--batchLogsDir BATCHLOGSDIR
Directory to tell the backing batch system to log into.
Should be available on both the leader and the workers,
Expand Down
35 changes: 19 additions & 16 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +65,8 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu
self.boss = boss
self.boss.config.statePollingWait = \
self.boss.config.statePollingWait or self.boss.getWaitDuration()
self.boss.config.state_polling_timeout = \
self.boss.config.state_polling_timeout or self.boss.config.statePollingWait * 10
self.newJobsQueue = newJobsQueue
self.updatedJobsQueue = updatedJobsQueue
self.killQueue = killQueue
Expand Down Expand Up @@ -175,7 +178,8 @@ def killJobs(self):
while killList:
for jobID in list(killList):
batchJobID = self.getBatchSystemID(jobID)
if self.boss.with_retries(self.getJobExitCode, batchJobID) is not None:
exit_code = self.boss.with_retries(self.getJobExitCode, batchJobID)
if exit_code is not None:
logger.debug('Adding jobID %s to killedJobsQueue', jobID)
self.killedJobsQueue.put(jobID)
killList.remove(jobID)
Expand Down Expand Up @@ -503,21 +507,20 @@ def sleepSeconds(self, sleeptime=1):

def with_retries(self, operation, *args, **kwargs):
"""
Call operation with args and kwargs. If one of the calls to an SGE
command fails, sleep and try again for a set number of times.
Call operation with args and kwargs. If one of the calls to a
command fails, sleep and try again.
"""
maxTries = 3
tries = 0
while True:
tries += 1
try:
return operation(*args, **kwargs)
except CalledProcessErrorStderr as err:
if tries < maxTries:
logger.error("Will retry errored operation %s, code %d: %s",
operation.__name__, err.returncode, err.stderr)
time.sleep(self.config.statePollingWait)
else:
logger.error("Failed operation %s, code %d: %s",
for attempt in old_retry(
# Don't retry more often than the state polling wait.
delays=[max(delay, self.config.statePollingWait) for delay in DEFAULT_DELAYS],
timeout=self.config.state_polling_timeout,
predicate=lambda e: isinstance(e, CalledProcessErrorStderr)
):
with attempt:
try:
return operation(*args, **kwargs)
except CalledProcessErrorStderr as err:
logger.error("Errored operation %s, code %d: %s",
operation.__name__, err.returncode, err.stderr)
# Raise up to the retry logic, which will retry until timeout
raise err
9 changes: 9 additions & 0 deletions src/toil/batchSystems/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette
set_option("manualMemArgs")
set_option("run_local_jobs_on_workers")
set_option("statePollingWait")
set_option("state_polling_timeout")
set_option("batch_logs_dir")


Expand Down Expand Up @@ -164,6 +165,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) -
"Return cached results if within the waiting period. Only works for grid "
"engine batch systems such as gridengine, htcondor, torque, slurm, and lsf."
)
parser.add_argument(
"--statePollingTimeout",
dest="state_polling_timeout",
type=int,
default=1200,
help="Time, in seconds, to retry against a broken scheduler. Only works for grid "
"engine batch systems such as gridengine, htcondor, torque, slurm, and lsf."
)
parser.add_argument(
"--batchLogsDir",
dest="batch_logs_dir",
Expand Down
82 changes: 62 additions & 20 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import os
from argparse import ArgumentParser, _ArgumentGroup
from shlex import quote
from typing import Dict, List, Optional, Tuple, TypeVar, Union
from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union

from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE, InsufficientSystemResources
from toil.batchSystems.abstractGridEngineBatchSystem import \
AbstractGridEngineBatchSystem
from toil.batchSystems.options import OptionSetter
Expand All @@ -27,6 +27,46 @@

logger = logging.getLogger(__name__)

# We have a complete list of Slurm states. States not in one of these aren't
# allowed. See <https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES>

# If a job is in one of these states, Slurm can't run it anymore.
# We don't include states where the job is held or paused here;
# those mean it could run and needs to wait for someone to un-hold
# it, so Toil should wait for it.
#
# We map from each terminal state to the Toil-ontology exit reason.
TERMINAL_STATES: Dict[str, BatchJobExitReason] = {
"BOOT_FAIL": BatchJobExitReason.LOST,
"CANCELLED": BatchJobExitReason.KILLED,
"COMPLETED": BatchJobExitReason.FINISHED,
"DEADLINE": BatchJobExitReason.KILLED,
"FAILED": BatchJobExitReason.FAILED,
"NODE_FAIL": BatchJobExitReason.LOST,
"OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT,
"PREEMPTED": BatchJobExitReason.KILLED,
"REVOKED": BatchJobExitReason.KILLED,
"SPECIAL_EXIT": BatchJobExitReason.FAILED,
"TIMEOUT": BatchJobExitReason.KILLED
}

# If a job is in one of these states, it might eventually move to a different
# state.
NONTERMINAL_STATES: Set[str] = {
"CONFIGURING",
"COMPLETING",
"PENDING",
"RUNNING",
"RESV_DEL_HOLD",
"REQUEUE_FED",
"REQUEUE_HOLD",
"REQUEUED",
"RESIZING",
"SIGNALING",
"STAGE_OUT",
"STOPPED",
"SUSPENDED"
}

class SlurmBatchSystem(AbstractGridEngineBatchSystem):

Expand Down Expand Up @@ -165,24 +205,6 @@ def _get_job_return_code(self, status: tuple) -> Union[int, Tuple[int, Optional[
"""
state, rc = status

# If a job is in one of these states, Slurm can't run it anymore.
# We don't include states where the job is held or paused here;
# those mean it could run and needs to wait for someone to un-hold
# it, so Toil should wait for it.
#
# We map from each terminal state to the Toil-ontology exit reason.
TERMINAL_STATES: Dict[str, BatchJobExitReason] = {
"BOOT_FAIL": BatchJobExitReason.LOST,
"CANCELLED": BatchJobExitReason.KILLED,
"COMPLETED": BatchJobExitReason.FINISHED,
"DEADLINE": BatchJobExitReason.KILLED,
"FAILED": BatchJobExitReason.FAILED,
"NODE_FAIL": BatchJobExitReason.LOST,
"OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT,
"PREEMPTED": BatchJobExitReason.KILLED,
"TIMEOUT": BatchJobExitReason.KILLED
}

if state not in TERMINAL_STATES:
# Don't treat the job as exited yet
return None
Expand All @@ -204,6 +226,24 @@ def _get_job_return_code(self, status: tuple) -> Union[int, Tuple[int, Optional[
# If the code is nonzero, pass it along.
return (rc, exit_reason)

def _canonicalize_state(self, state: str) -> str:
"""
Turn a state string form SLURM into just the state token like "CANCELED".
"""

# Slurm will sometimes send something like "CANCELED by 30065" in
# the state column for some reason.

state_token = state

if " " in state_token:
state_token = state.split(" ", 1)[0]

if state_token not in TERMINAL_STATES and state_token not in NONTERMINAL_STATES:
raise RuntimeError("Toil job in unimplemented Slurm state " + state)

return state_token

def _getJobDetailsFromSacct(self, job_id_list: list) -> dict:
"""
Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`.
Expand Down Expand Up @@ -231,6 +271,7 @@ def _getJobDetailsFromSacct(self, job_id_list: list) -> dict:
if len(values) < 3:
continue
job_id_raw, state, exitcode = values
state = self._canonicalize_state(state)
logger.debug("%s state of job %s is %s", args[0], job_id_raw, state)
# JobIDRaw is in the form JobID[.JobStep]; we're not interested in job steps.
job_id_parts = job_id_raw.split(".")
Expand Down Expand Up @@ -305,6 +346,7 @@ def _getJobDetailsFromScontrol(self, job_id_list: list) -> dict:
if job_id not in job_id_list:
continue
state = job['JobState']
state = self._canonicalize_state(state)
logger.debug("%s state of job %s is %s", args[0], job_id, state)
try:
exitcode = job['ExitCode']
Expand Down
1 change: 1 addition & 0 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class Config:
"""The backing scheduler will be instructed, if possible, to save logs
to this directory, where the leader can read them."""
statePollingWait: int
state_polling_timeout: int
disableAutoDeployment: bool

# Core options
Expand Down