Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for job dependencies #132

Merged
merged 10 commits into from
Oct 13, 2022
10 changes: 9 additions & 1 deletion pysqa/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def command_line(argv):
try:
opts, args = getopt.getopt(
argv,
"f:pq:j:w:n:m:t:c:ri:dslh",
"f:pq:j:w:n:m:t:b:c:ri:dslh",
[
"config_directory=",
"submit",
Expand All @@ -35,6 +35,7 @@ def command_line(argv):
"cores=",
"memory=",
"run_time=",
"dependency=",
"command=",
"reservation",
"id=",
Expand All @@ -53,6 +54,7 @@ def command_line(argv):
mode_reservation = False
mode_status = False
mode_list = False
dependency_list = None
for opt, arg in opts:
if opt in ("-f", "--config_directory"):
directory = arg
Expand Down Expand Up @@ -83,6 +85,11 @@ def command_line(argv):
mode_status = True
elif opt in ("-l", "--list"):
mode_list = True
elif opt in ("-b", "--dependency"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if -b is the best choice for dependencies. But as I do not have a better suggestion either as -d is already in use for delete, I am fine with this.

if dependency_list is None:
dependency_list = [arg]
else:
dependency_list.append(arg)
elif opt in ("-h", "--help"):
print("cmd.py help ... coming soon.")
sys.exit()
Expand All @@ -97,6 +104,7 @@ def command_line(argv):
cores=cores,
memory_max=memory_max,
run_time_max=run_time_max,
dependency_list=dependency_list,
command=command,
)
)
Expand Down
2 changes: 2 additions & 0 deletions pysqa/queueadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def submit_job(
cores=None,
memory_max=None,
run_time_max=None,
dependency_list=None,
command=None,
):
"""
Expand All @@ -148,6 +149,7 @@ def submit_job(
cores (int/None): Number of hardware threads requested
memory_max (int/None): Amount of memory requested per node in GB
run_time_max (int/None): Maximum runtime in seconds
dependency_list(list[str]/None: Job ids of jobs to be completed before starting
command (str/None): shell command to run in the job

Returns:
Expand Down
13 changes: 12 additions & 1 deletion pysqa/utils/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def submit_job(
cores=None,
memory_max=None,
run_time_max=None,
dependency_list=None,
command=None,
):
"""
Expand All @@ -132,6 +133,7 @@ def submit_job(
cores (int/None):
memory_max (int/None):
run_time_max (int/None):
dependency_list (list[str]/None:
command (str/None):

Returns:
Expand All @@ -151,7 +153,9 @@ def submit_job(
command=command,
)
out = self._execute_command(
commands=self._commands.submit_job_command + [queue_script_path],
commands=self._list_command_to_be_executed(
dependency_list, queue_script_path
),
working_directory=working_directory,
split_output=False,
)
Expand All @@ -160,6 +164,13 @@ def submit_job(
else:
return None

def _list_command_to_be_executed(self, dependency_list, queue_script_path) -> list:
return (
self._commands.submit_job_command
+ self._commands.dependencies(dependency_list)
+ [queue_script_path]
)

def enable_reservation(self, process_id):
"""

Expand Down
10 changes: 5 additions & 5 deletions pysqa/utils/modular.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def submit_job(
cores=None,
memory_max=None,
run_time_max=None,
dependency_list=None,
command=None,
):
"""
Expand All @@ -34,6 +35,7 @@ def submit_job(
cores (int/None):
memory_max (int/None):
run_time_max (int/None):
dependency_list (list/None):
command (str/None):

Returns:
Expand All @@ -49,11 +51,9 @@ def submit_job(
command=command,
)
cluster_module = self._queue_to_cluster_dict[queue]
commands = (
self._switch_cluster_command(cluster_module=cluster_module)
+ self._commands.submit_job_command
+ [queue_script_path]
)
commands = self._switch_cluster_command(
cluster_module=cluster_module
) + self._list_command_to_be_executed(dependency_list, queue_script_path)
out = self._execute_command(
commands=commands,
working_directory=working_directory,
Expand Down
5 changes: 5 additions & 0 deletions pysqa/utils/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ def submit_job(
cores=None,
memory_max=None,
run_time_max=None,
dependency_list=None,
command=None,
):
if dependency_list is not None:
raise NotImplementedError(
"Submitting jobs with dependencies to a remote cluster is not yet supported."
)
self._transfer_data_to_remote(working_directory=working_directory)
output = self._execute_remote_command(command=command)
return int(output.split()[-1])
Expand Down
52 changes: 52 additions & 0 deletions pysqa/wrapper/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from abc import ABC, abstractmethod

__author__ = "Niklas Siemer"
__copyright__ = (
"Copyright 2022, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Niklas Siemer"
__email__ = "[email protected]"
__status__ = "production"
__date__ = "Aug 15, 2022"


class SchedulerCommands(ABC):
@property
@abstractmethod
def submit_job_command(self):
pass

@property
@abstractmethod
def delete_job_command(self):
pass

@property
def enable_reservation_command(self):
raise NotImplementedError()

@property
@abstractmethod
def get_queue_status_command(self):
pass

@staticmethod
def dependencies(dependency_list) -> list:
if dependency_list is not None:
raise NotImplementedError()
else:
return []

@staticmethod
def get_job_id_from_output(queue_submit_output):
raise NotImplementedError()

@staticmethod
def convert_queue_status(queue_status_output):
raise NotImplementedError()
7 changes: 7 additions & 0 deletions pysqa/wrapper/gent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ def convert_queue_status(queue_status_output):
"status": status_lst,
}
)

@staticmethod
def dependencies(dependency_list) -> list:
if dependency_list is not None:
raise NotImplementedError()
else:
return []
16 changes: 3 additions & 13 deletions pysqa/wrapper/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from pysqa.wrapper.generic import SchedulerCommands

__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - "
Expand All @@ -14,7 +16,7 @@
__date__ = "Feb 9, 2019"


class LsfCommands(object):
class LsfCommands(SchedulerCommands):
@property
def submit_job_command(self):
return ["bsub", "-terse"]
Expand All @@ -23,18 +25,6 @@ def submit_job_command(self):
def delete_job_command(self):
return ["bkill"]

@property
def enable_reservation_command(self):
raise NotImplementedError()

@property
def get_queue_status_command(self):
return ["qstat", "-x"]

@staticmethod
def get_job_id_from_output(queue_submit_output):
raise NotImplementedError()

@staticmethod
def convert_queue_status(queue_status_output):
raise NotImplementedError()
16 changes: 3 additions & 13 deletions pysqa/wrapper/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from pysqa.wrapper.generic import SchedulerCommands

__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - "
Expand All @@ -14,7 +16,7 @@
__date__ = "Feb 9, 2019"


class MoabCommands(object):
class MoabCommands(SchedulerCommands):
@property
def submit_job_command(self):
return ["msub"]
Expand All @@ -23,18 +25,6 @@ def submit_job_command(self):
def delete_job_command(self):
return ["mjobctl", "-c"]

@property
def enable_reservation_command(self):
raise NotImplementedError()

@property
def get_queue_status_command(self):
return ["mdiag", "-x"]

@staticmethod
def get_job_id_from_output(queue_submit_output):
raise NotImplementedError()

@staticmethod
def convert_queue_status(queue_status_output):
raise NotImplementedError()
7 changes: 2 additions & 5 deletions pysqa/wrapper/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pandas
import defusedxml.cElementTree as ETree
from pysqa.wrapper.generic import SchedulerCommands

__author__ = "Jan Janssen"
__copyright__ = (
Expand All @@ -17,7 +18,7 @@
__date__ = "Feb 9, 2019"


class SunGridEngineCommands(object):
class SunGridEngineCommands(SchedulerCommands):
@property
def submit_job_command(self):
return ["qsub", "-terse"]
Expand All @@ -34,10 +35,6 @@ def enable_reservation_command(self):
def get_queue_status_command(self):
return ["qstat", "-xml"]

@staticmethod
def get_job_id_from_output(queue_submit_output):
return int(queue_submit_output)

@staticmethod
def convert_queue_status(queue_status_output):
def leaf_to_dict(leaf):
Expand Down
14 changes: 9 additions & 5 deletions pysqa/wrapper/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Distributed under the terms of "New BSD License", see the LICENSE file.

import pandas
from pysqa.wrapper.generic import SchedulerCommands


__author__ = "Jan Janssen"
Expand All @@ -17,7 +18,7 @@
__date__ = "Feb 9, 2019"


class SlurmCommands(object):
class SlurmCommands(SchedulerCommands):
@property
def submit_job_command(self):
return ["sbatch", "--parsable"]
Expand All @@ -26,10 +27,6 @@ def submit_job_command(self):
def delete_job_command(self):
return ["scancel"]

@property
def enable_reservation_command(self):
raise NotImplementedError()

@property
def get_queue_status_command(self):
return ["squeue", "--format", "%A|%u|%t|%.15j", "--noheader"]
Expand Down Expand Up @@ -61,3 +58,10 @@ def convert_queue_status(queue_status_output):
df.loc[df.status == "r", "status"] = "running"
df.loc[df.status == "pd", "status"] = "pending"
return df

@staticmethod
def dependencies(dependency_list) -> list:
if dependency_list is not None:
return ["--dependency=afterok:" + ",".join(dependency_list)]
else:
return []
16 changes: 3 additions & 13 deletions pysqa/wrapper/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from pysqa.wrapper.generic import SchedulerCommands

__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - "
Expand All @@ -14,7 +16,7 @@
__date__ = "Feb 9, 2019"


class TorqueCommands(object):
class TorqueCommands(SchedulerCommands):
@property
def submit_job_command(self):
return ["qsub", "-terse"]
Expand All @@ -23,18 +25,6 @@ def submit_job_command(self):
def delete_job_command(self):
return ["qdel"]

@property
def enable_reservation_command(self):
raise NotImplementedError()

@property
def get_queue_status_command(self):
return ["qstat", "-x"]

@staticmethod
def get_job_id_from_output(queue_submit_output):
raise NotImplementedError()

@staticmethod
def convert_queue_status(queue_status_output):
raise NotImplementedError()
Loading