diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index f8f82d14fe..bd9053aad9 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -208,6 +208,10 @@ levels in toil are based on priority from the logging module: the waiting period. Only works for grid engine batch systems such as gridengine, htcondor, torque, slurm, and lsf. + --statePollingTimeout STATEPOLLINGTIMEOUT + Time, in seconds, to retry against a broken scheduler. + Only works for grid engine batch systems such as + gridengine, htcondor, torque, slurm, and lsf. --batchLogsDir BATCHLOGSDIR Directory to tell the backing batch system to log into. Should be available on both the leader and the workers, diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 939863813e..a0cd5e6bb0 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -25,6 +25,7 @@ from toil.bus import ExternalBatchIdMessage, get_job_kind from toil.job import AcceleratorRequirement from toil.lib.misc import CalledProcessErrorStderr +from toil.lib.retry import old_retry, DEFAULT_DELAYS logger = logging.getLogger(__name__) @@ -64,6 +65,8 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu self.boss = boss self.boss.config.statePollingWait = \ self.boss.config.statePollingWait or self.boss.getWaitDuration() + self.boss.config.state_polling_timeout = \ + self.boss.config.state_polling_timeout or self.boss.config.statePollingWait * 10 self.newJobsQueue = newJobsQueue self.updatedJobsQueue = updatedJobsQueue self.killQueue = killQueue @@ -175,7 +178,8 @@ def killJobs(self): while killList: for jobID in list(killList): batchJobID = self.getBatchSystemID(jobID) - if self.boss.with_retries(self.getJobExitCode, batchJobID) is not None: + exit_code = self.boss.with_retries(self.getJobExitCode, batchJobID) + if exit_code is not None: logger.debug('Adding jobID %s to killedJobsQueue', jobID) self.killedJobsQueue.put(jobID) killList.remove(jobID) @@ -503,21 +507,20 @@ def sleepSeconds(self, sleeptime=1): def with_retries(self, operation, *args, **kwargs): """ - Call operation with args and kwargs. If one of the calls to an SGE - command fails, sleep and try again for a set number of times. + Call operation with args and kwargs. If one of the calls to a + command fails, sleep and try again. """ - maxTries = 3 - tries = 0 - while True: - tries += 1 - try: - return operation(*args, **kwargs) - except CalledProcessErrorStderr as err: - if tries < maxTries: - logger.error("Will retry errored operation %s, code %d: %s", - operation.__name__, err.returncode, err.stderr) - time.sleep(self.config.statePollingWait) - else: - logger.error("Failed operation %s, code %d: %s", + for attempt in old_retry( + # Don't retry more often than the state polling wait. + delays=[max(delay, self.config.statePollingWait) for delay in DEFAULT_DELAYS], + timeout=self.config.state_polling_timeout, + predicate=lambda e: isinstance(e, CalledProcessErrorStderr) + ): + with attempt: + try: + return operation(*args, **kwargs) + except CalledProcessErrorStderr as err: + logger.error("Errored operation %s, code %d: %s", operation.__name__, err.returncode, err.stderr) + # Raise up to the retry logic, which will retry until timeout raise err diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index e15423b8ef..b33f9971fb 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -76,6 +76,7 @@ def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSette set_option("manualMemArgs") set_option("run_local_jobs_on_workers") set_option("statePollingWait") + set_option("state_polling_timeout") set_option("batch_logs_dir") @@ -164,6 +165,14 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "Return cached results if within the waiting period. Only works for grid " "engine batch systems such as gridengine, htcondor, torque, slurm, and lsf." ) + parser.add_argument( + "--statePollingTimeout", + dest="state_polling_timeout", + type=int, + default=1200, + help="Time, in seconds, to retry against a broken scheduler. Only works for grid " + "engine batch systems such as gridengine, htcondor, torque, slurm, and lsf." + ) parser.add_argument( "--batchLogsDir", dest="batch_logs_dir", diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 6abdacb16d..3a624cf17c 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -16,9 +16,9 @@ import os from argparse import ArgumentParser, _ArgumentGroup from shlex import quote -from typing import Dict, List, Optional, Tuple, TypeVar, Union +from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union -from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE +from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE, InsufficientSystemResources from toil.batchSystems.abstractGridEngineBatchSystem import \ AbstractGridEngineBatchSystem from toil.batchSystems.options import OptionSetter @@ -27,6 +27,46 @@ logger = logging.getLogger(__name__) +# We have a complete list of Slurm states. States not in one of these aren't +# allowed. See + +# If a job is in one of these states, Slurm can't run it anymore. +# We don't include states where the job is held or paused here; +# those mean it could run and needs to wait for someone to un-hold +# it, so Toil should wait for it. +# +# We map from each terminal state to the Toil-ontology exit reason. +TERMINAL_STATES: Dict[str, BatchJobExitReason] = { + "BOOT_FAIL": BatchJobExitReason.LOST, + "CANCELLED": BatchJobExitReason.KILLED, + "COMPLETED": BatchJobExitReason.FINISHED, + "DEADLINE": BatchJobExitReason.KILLED, + "FAILED": BatchJobExitReason.FAILED, + "NODE_FAIL": BatchJobExitReason.LOST, + "OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT, + "PREEMPTED": BatchJobExitReason.KILLED, + "REVOKED": BatchJobExitReason.KILLED, + "SPECIAL_EXIT": BatchJobExitReason.FAILED, + "TIMEOUT": BatchJobExitReason.KILLED +} + +# If a job is in one of these states, it might eventually move to a different +# state. +NONTERMINAL_STATES: Set[str] = { + "CONFIGURING", + "COMPLETING", + "PENDING", + "RUNNING", + "RESV_DEL_HOLD", + "REQUEUE_FED", + "REQUEUE_HOLD", + "REQUEUED", + "RESIZING", + "SIGNALING", + "STAGE_OUT", + "STOPPED", + "SUSPENDED" +} class SlurmBatchSystem(AbstractGridEngineBatchSystem): @@ -165,24 +205,6 @@ def _get_job_return_code(self, status: tuple) -> Union[int, Tuple[int, Optional[ """ state, rc = status - # If a job is in one of these states, Slurm can't run it anymore. - # We don't include states where the job is held or paused here; - # those mean it could run and needs to wait for someone to un-hold - # it, so Toil should wait for it. - # - # We map from each terminal state to the Toil-ontology exit reason. - TERMINAL_STATES: Dict[str, BatchJobExitReason] = { - "BOOT_FAIL": BatchJobExitReason.LOST, - "CANCELLED": BatchJobExitReason.KILLED, - "COMPLETED": BatchJobExitReason.FINISHED, - "DEADLINE": BatchJobExitReason.KILLED, - "FAILED": BatchJobExitReason.FAILED, - "NODE_FAIL": BatchJobExitReason.LOST, - "OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT, - "PREEMPTED": BatchJobExitReason.KILLED, - "TIMEOUT": BatchJobExitReason.KILLED - } - if state not in TERMINAL_STATES: # Don't treat the job as exited yet return None @@ -204,6 +226,24 @@ def _get_job_return_code(self, status: tuple) -> Union[int, Tuple[int, Optional[ # If the code is nonzero, pass it along. return (rc, exit_reason) + def _canonicalize_state(self, state: str) -> str: + """ + Turn a state string form SLURM into just the state token like "CANCELED". + """ + + # Slurm will sometimes send something like "CANCELED by 30065" in + # the state column for some reason. + + state_token = state + + if " " in state_token: + state_token = state.split(" ", 1)[0] + + if state_token not in TERMINAL_STATES and state_token not in NONTERMINAL_STATES: + raise RuntimeError("Toil job in unimplemented Slurm state " + state) + + return state_token + def _getJobDetailsFromSacct(self, job_id_list: list) -> dict: """ Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`. @@ -231,6 +271,7 @@ def _getJobDetailsFromSacct(self, job_id_list: list) -> dict: if len(values) < 3: continue job_id_raw, state, exitcode = values + state = self._canonicalize_state(state) logger.debug("%s state of job %s is %s", args[0], job_id_raw, state) # JobIDRaw is in the form JobID[.JobStep]; we're not interested in job steps. job_id_parts = job_id_raw.split(".") @@ -305,6 +346,7 @@ def _getJobDetailsFromScontrol(self, job_id_list: list) -> dict: if job_id not in job_id_list: continue state = job['JobState'] + state = self._canonicalize_state(state) logger.debug("%s state of job %s is %s", args[0], job_id, state) try: exitcode = job['ExitCode'] diff --git a/src/toil/common.py b/src/toil/common.py index 70e278710d..30899d9776 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -138,6 +138,7 @@ class Config: """The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.""" statePollingWait: int + state_polling_timeout: int disableAutoDeployment: bool # Core options