diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 2aa855bb48..d201c2f745 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -70,6 +70,9 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): Slurm queue to place job in. If unspecified or ``None``, no queue slurm directive will be specified. constraint : str Slurm job constraint, often used to choose cpu or gpu type. If unspecified or ``None``, no constraint slurm directive will be added. + clusters : str + Slurm cluster name, or comma seperated cluster list, used to choose between different clusters in a federated Slurm instance. + If unspecified or ``None``, no slurm directive for clusters will be added. channel : Channel Channel for accessing this provider. nodes_per_block : int @@ -116,6 +119,7 @@ def __init__(self, account: Optional[str] = None, qos: Optional[str] = None, constraint: Optional[str] = None, + clusters: Optional[str] = None, channel: Channel = LocalChannel(), nodes_per_block: int = 1, cores_per_node: Optional[int] = None, @@ -152,6 +156,7 @@ def __init__(self, self.account = account self.qos = qos self.constraint = constraint + self.clusters = clusters self.scheduler_options = scheduler_options + '\n' if exclusive: self.scheduler_options += "#SBATCH --exclusive\n" @@ -163,6 +168,8 @@ def __init__(self, self.scheduler_options += "#SBATCH --qos={}\n".format(qos) if constraint: self.scheduler_options += "#SBATCH --constraint={}\n".format(constraint) + if clusters: + self.scheduler_options += "#SBATCH --clusters={}\n".format(clusters) self.regex_job_id = regex_job_id self.worker_init = worker_init + '\n' @@ -174,14 +181,22 @@ def __init__(self, logger.debug(f"sacct returned retcode={retcode} stderr={stderr}") if retcode == 0: logger.debug("using sacct to get job status") + _cmd = "sacct" + # Add clusters option to sacct if provided + if self.clusters: + _cmd += f" --clusters={self.clusters}" # Using state%20 to get enough characters to not truncate output # of the state. Without output can look like " CANCELLED+" - self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'" + self._cmd = _cmd + " -X --noheader --format=jobid,state%20 --job '{0}'" self._translate_table = sacct_translate_table else: logger.debug(f"sacct failed with retcode={retcode}") logger.debug("falling back to using squeue to get job status") - self._cmd = "squeue --noheader --format='%i %t' --job '{0}'" + _cmd = "squeue" + # Add clusters option to squeue if provided + if self.clusters: + _cmd += f" --clusters={self.clusters}" + self._cmd = _cmd + " --noheader --format='%i %t' --job '{0}'" self._translate_table = squeue_translate_table def _status(self): @@ -344,7 +359,14 @@ def cancel(self, job_ids): ''' job_id_list = ' '.join(job_ids) - retcode, stdout, stderr = self.execute_wait("scancel {0}".format(job_id_list)) + + # Make the command to cancel jobs + _cmd = "scancel" + if self.clusters: + _cmd += f" --clusters={self.clusters}" + _cmd += " {0}" + + retcode, stdout, stderr = self.execute_wait(_cmd.format(job_id_list)) rets = None if retcode == 0: for jid in job_ids: