diff --git a/pysqa/cmd.py b/pysqa/cmd.py index ef5b18a4..4a1c589f 100644 --- a/pysqa/cmd.py +++ b/pysqa/cmd.py @@ -25,7 +25,7 @@ def command_line(argv): try: opts, args = getopt.getopt( argv, - "f:pq:j:w:n:m:t:c:ri:dslh", + "f:pq:j:w:n:m:t:b:c:ri:dslh", [ "config_directory=", "submit", @@ -35,6 +35,7 @@ def command_line(argv): "cores=", "memory=", "run_time=", + "dependency=", "command=", "reservation", "id=", @@ -53,6 +54,7 @@ def command_line(argv): mode_reservation = False mode_status = False mode_list = False + dependency_list = None for opt, arg in opts: if opt in ("-f", "--config_directory"): directory = arg @@ -83,6 +85,11 @@ def command_line(argv): mode_status = True elif opt in ("-l", "--list"): mode_list = True + elif opt in ("-b", "--dependency"): + if dependency_list is None: + dependency_list = [arg] + else: + dependency_list.append(arg) elif opt in ("-h", "--help"): print("cmd.py help ... coming soon.") sys.exit() @@ -97,6 +104,7 @@ def command_line(argv): cores=cores, memory_max=memory_max, run_time_max=run_time_max, + dependency_list=dependency_list, command=command, ) ) diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index 68a16fe5..1a23b19f 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -136,6 +136,7 @@ def submit_job( cores=None, memory_max=None, run_time_max=None, + dependency_list=None, command=None, ): """ @@ -148,6 +149,7 @@ def submit_job( cores (int/None): Number of hardware threads requested memory_max (int/None): Amount of memory requested per node in GB run_time_max (int/None): Maximum runtime in seconds + dependency_list(list[str]/None: Job ids of jobs to be completed before starting command (str/None): shell command to run in the job Returns: diff --git a/pysqa/utils/basic.py b/pysqa/utils/basic.py index 1c0ed46b..4ef77abd 100644 --- a/pysqa/utils/basic.py +++ b/pysqa/utils/basic.py @@ -121,6 +121,7 @@ def submit_job( cores=None, memory_max=None, run_time_max=None, + dependency_list=None, command=None, ): """ @@ -132,6 +133,7 @@ def submit_job( cores (int/None): memory_max (int/None): run_time_max (int/None): + dependency_list (list[str]/None: command (str/None): Returns: @@ -151,7 +153,9 @@ def submit_job( command=command, ) out = self._execute_command( - commands=self._commands.submit_job_command + [queue_script_path], + commands=self._list_command_to_be_executed( + dependency_list, queue_script_path + ), working_directory=working_directory, split_output=False, ) @@ -160,6 +164,13 @@ def submit_job( else: return None + def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list: + return ( + self._commands.submit_job_command + + self._commands.dependencies(dependency_list) + + [queue_script_path] + ) + def enable_reservation(self, process_id): """ diff --git a/pysqa/utils/modular.py b/pysqa/utils/modular.py index e4416f11..274a217f 100644 --- a/pysqa/utils/modular.py +++ b/pysqa/utils/modular.py @@ -23,6 +23,7 @@ def submit_job( cores=None, memory_max=None, run_time_max=None, + dependency_list=None, command=None, ): """ @@ -34,6 +35,7 @@ def submit_job( cores (int/None): memory_max (int/None): run_time_max (int/None): + dependency_list (list/None): command (str/None): Returns: @@ -49,11 +51,9 @@ def submit_job( command=command, ) cluster_module = self._queue_to_cluster_dict[queue] - commands = ( - self._switch_cluster_command(cluster_module=cluster_module) - + self._commands.submit_job_command - + [queue_script_path] - ) + commands = self._switch_cluster_command( + cluster_module=cluster_module + ) + self._list_command_to_be_executed(dependency_list, queue_script_path) out = self._execute_command( commands=commands, working_directory=working_directory, diff --git a/pysqa/utils/remote.py b/pysqa/utils/remote.py index 1b3066cc..7dd1dad1 100644 --- a/pysqa/utils/remote.py +++ b/pysqa/utils/remote.py @@ -48,8 +48,13 @@ def submit_job( cores=None, memory_max=None, run_time_max=None, + dependency_list=None, command=None, ): + if dependency_list is not None: + raise NotImplementedError( + "Submitting jobs with dependencies to a remote cluster is not yet supported." + ) self._transfer_data_to_remote(working_directory=working_directory) output = self._execute_remote_command(command=command) return int(output.split()[-1]) diff --git a/pysqa/wrapper/generic.py b/pysqa/wrapper/generic.py new file mode 100644 index 00000000..327b6e9a --- /dev/null +++ b/pysqa/wrapper/generic.py @@ -0,0 +1,52 @@ +# coding: utf-8 +# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department +# Distributed under the terms of "New BSD License", see the LICENSE file. + +from abc import ABC, abstractmethod + +__author__ = "Niklas Siemer" +__copyright__ = ( + "Copyright 2022, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) +__version__ = "1.0" +__maintainer__ = "Niklas Siemer" +__email__ = "siemer@mpie.de" +__status__ = "production" +__date__ = "Aug 15, 2022" + + +class SchedulerCommands(ABC): + @property + @abstractmethod + def submit_job_command(self): + pass + + @property + @abstractmethod + def delete_job_command(self): + pass + + @property + def enable_reservation_command(self): + raise NotImplementedError() + + @property + @abstractmethod + def get_queue_status_command(self): + pass + + @staticmethod + def dependencies(dependency_list) -> list: + if dependency_list is not None: + raise NotImplementedError() + else: + return [] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + raise NotImplementedError() + + @staticmethod + def convert_queue_status(queue_status_output): + raise NotImplementedError() diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py index f7a4cc7e..946eaa0c 100644 --- a/pysqa/wrapper/gent.py +++ b/pysqa/wrapper/gent.py @@ -50,3 +50,10 @@ def convert_queue_status(queue_status_output): "status": status_lst, } ) + + @staticmethod + def dependencies(dependency_list) -> list: + if dependency_list is not None: + raise NotImplementedError() + else: + return [] diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index 84cb4c61..d99db0d0 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -2,6 +2,8 @@ # Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department # Distributed under the terms of "New BSD License", see the LICENSE file. +from pysqa.wrapper.generic import SchedulerCommands + __author__ = "Jan Janssen" __copyright__ = ( "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " @@ -14,7 +16,7 @@ __date__ = "Feb 9, 2019" -class LsfCommands(object): +class LsfCommands(SchedulerCommands): @property def submit_job_command(self): return ["bsub", "-terse"] @@ -23,18 +25,6 @@ def submit_job_command(self): def delete_job_command(self): return ["bkill"] - @property - def enable_reservation_command(self): - raise NotImplementedError() - @property def get_queue_status_command(self): return ["qstat", "-x"] - - @staticmethod - def get_job_id_from_output(queue_submit_output): - raise NotImplementedError() - - @staticmethod - def convert_queue_status(queue_status_output): - raise NotImplementedError() diff --git a/pysqa/wrapper/moab.py b/pysqa/wrapper/moab.py index 4d63d04b..19a6d0e6 100644 --- a/pysqa/wrapper/moab.py +++ b/pysqa/wrapper/moab.py @@ -2,6 +2,8 @@ # Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department # Distributed under the terms of "New BSD License", see the LICENSE file. +from pysqa.wrapper.generic import SchedulerCommands + __author__ = "Jan Janssen" __copyright__ = ( "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " @@ -14,7 +16,7 @@ __date__ = "Feb 9, 2019" -class MoabCommands(object): +class MoabCommands(SchedulerCommands): @property def submit_job_command(self): return ["msub"] @@ -23,18 +25,6 @@ def submit_job_command(self): def delete_job_command(self): return ["mjobctl", "-c"] - @property - def enable_reservation_command(self): - raise NotImplementedError() - @property def get_queue_status_command(self): return ["mdiag", "-x"] - - @staticmethod - def get_job_id_from_output(queue_submit_output): - raise NotImplementedError() - - @staticmethod - def convert_queue_status(queue_status_output): - raise NotImplementedError() diff --git a/pysqa/wrapper/sge.py b/pysqa/wrapper/sge.py index 661fd720..5308aa6a 100644 --- a/pysqa/wrapper/sge.py +++ b/pysqa/wrapper/sge.py @@ -4,6 +4,7 @@ import pandas import defusedxml.cElementTree as ETree +from pysqa.wrapper.generic import SchedulerCommands __author__ = "Jan Janssen" __copyright__ = ( @@ -17,7 +18,7 @@ __date__ = "Feb 9, 2019" -class SunGridEngineCommands(object): +class SunGridEngineCommands(SchedulerCommands): @property def submit_job_command(self): return ["qsub", "-terse"] @@ -34,10 +35,6 @@ def enable_reservation_command(self): def get_queue_status_command(self): return ["qstat", "-xml"] - @staticmethod - def get_job_id_from_output(queue_submit_output): - return int(queue_submit_output) - @staticmethod def convert_queue_status(queue_status_output): def leaf_to_dict(leaf): diff --git a/pysqa/wrapper/slurm.py b/pysqa/wrapper/slurm.py index b4e52cf5..1977fc68 100644 --- a/pysqa/wrapper/slurm.py +++ b/pysqa/wrapper/slurm.py @@ -3,6 +3,7 @@ # Distributed under the terms of "New BSD License", see the LICENSE file. import pandas +from pysqa.wrapper.generic import SchedulerCommands __author__ = "Jan Janssen" @@ -17,7 +18,7 @@ __date__ = "Feb 9, 2019" -class SlurmCommands(object): +class SlurmCommands(SchedulerCommands): @property def submit_job_command(self): return ["sbatch", "--parsable"] @@ -26,10 +27,6 @@ def submit_job_command(self): def delete_job_command(self): return ["scancel"] - @property - def enable_reservation_command(self): - raise NotImplementedError() - @property def get_queue_status_command(self): return ["squeue", "--format", "%A|%u|%t|%.15j", "--noheader"] @@ -61,3 +58,10 @@ def convert_queue_status(queue_status_output): df.loc[df.status == "r", "status"] = "running" df.loc[df.status == "pd", "status"] = "pending" return df + + @staticmethod + def dependencies(dependency_list) -> list: + if dependency_list is not None: + return ["--dependency=afterok:" + ",".join(dependency_list)] + else: + return [] diff --git a/pysqa/wrapper/torque.py b/pysqa/wrapper/torque.py index aefe4b76..56e323ba 100644 --- a/pysqa/wrapper/torque.py +++ b/pysqa/wrapper/torque.py @@ -2,6 +2,8 @@ # Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department # Distributed under the terms of "New BSD License", see the LICENSE file. +from pysqa.wrapper.generic import SchedulerCommands + __author__ = "Jan Janssen" __copyright__ = ( "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " @@ -14,7 +16,7 @@ __date__ = "Feb 9, 2019" -class TorqueCommands(object): +class TorqueCommands(SchedulerCommands): @property def submit_job_command(self): return ["qsub", "-terse"] @@ -23,18 +25,6 @@ def submit_job_command(self): def delete_job_command(self): return ["qdel"] - @property - def enable_reservation_command(self): - raise NotImplementedError() - @property def get_queue_status_command(self): return ["qstat", "-x"] - - @staticmethod - def get_job_id_from_output(queue_submit_output): - raise NotImplementedError() - - @staticmethod - def convert_queue_status(queue_status_output): - raise NotImplementedError() diff --git a/tests/test_queueadapter.py b/tests/test_queueadapter.py index c21664ef..b6976b74 100644 --- a/tests/test_queueadapter.py +++ b/tests/test_queueadapter.py @@ -151,6 +151,83 @@ def test_interfaces(self): ["squeue", "--format", "%A|%u|%t|%.15j", "--noheader"], ) + def test__list_command_to_be_executed(self): + with self.subTest("slurm"): + self.assertEqual( + self.slurm._adapter._list_command_to_be_executed(None, "here"), + ["sbatch", "--parsable", "here"], + ) + with self.subTest("slurm with one dependency"): + self.assertEqual( + self.slurm._adapter._list_command_to_be_executed(["2"], "here"), + ["sbatch", "--parsable", "--dependency=afterok:2", "here"], + ) + with self.subTest("slurm with two dependencies"): + self.assertEqual( + self.slurm._adapter._list_command_to_be_executed(["2", "34"], "here"), + ["sbatch", "--parsable", "--dependency=afterok:2,34", "here"], + ) + with self.subTest("torque"): + self.assertEqual( + self.torque._adapter._list_command_to_be_executed(None, "here"), + ["qsub", "-terse", "here"], + ) + with self.subTest("torque with dependency"): + self.assertRaises( + NotImplementedError, + self.torque._adapter._list_command_to_be_executed, + [], + "here", + ) + with self.subTest("moab with dependency"): + self.assertRaises( + NotImplementedError, + self.moab._adapter._list_command_to_be_executed, + [], + "here", + ) + with self.subTest("moab"): + self.assertEqual( + self.moab._adapter._list_command_to_be_executed(None, "here"), + ["msub", "here"], + ) + with self.subTest("gent"): + self.assertEqual( + self.gent._adapter._list_command_to_be_executed(None, "here"), + ["sbatch", "--parsable", "here"], + ) + with self.subTest("gent with dependency"): + self.assertRaises( + NotImplementedError, + self.gent._adapter._list_command_to_be_executed, + [], + "here", + ) + with self.subTest("sge"): + self.assertEqual( + self.sge._adapter._list_command_to_be_executed(None, "here"), + ["qsub", "-terse", "here"], + ) + with self.subTest("sge with dependency"): + self.assertRaises( + NotImplementedError, + self.sge._adapter._list_command_to_be_executed, + [], + "here", + ) + with self.subTest("lsf"): + self.assertEqual( + self.lsf._adapter._list_command_to_be_executed(None, "here"), + ["bsub", "-terse", "here"], + ) + with self.subTest("lsf with dependency"): + self.assertRaises( + NotImplementedError, + self.lsf._adapter._list_command_to_be_executed, + [], + "here", + ) + def test_convert_queue_status(self): with open(os.path.join(self.path, "config/sge", "qstat.xml"), "r") as f: content = f.read()