Skip to content

Commit

Permalink
init function
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Sep 28, 2024
1 parent 94ef6d9 commit 82da893
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions pysqa/utils/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from pysqa.utils.execute import execute_command
from pysqa.utils.queues import Queues
from pysqa.wrapper.generic import SchedulerCommands

queue_type_dict = {
"SGE": {
Expand Down Expand Up @@ -50,6 +51,32 @@
}


def get_queue_commands(queue_type: str) -> Union[SchedulerCommands, None]:
"""
Load queuing system commands class
Args:
queue_type (str): Type of the queuing system in capital letters
Returns:
SchedulerCommands: queuing system commands class instance
"""
if queue_type in queue_type_dict.keys():
class_name = queue_type_dict[queue_type]["class_name"]
module_name = queue_type_dict[queue_type]["module_name"]
if module_name is not None and class_name is not None:
return getattr(importlib.import_module(module_name), class_name)()
else:
return None
else:
raise ValueError(
"The queue_type "
+ queue_type
+ " is not found in the list of supported queue types "
+ str(list(queue_type_dict.keys()))
)


class BasisQueueAdapter(object):
"""
The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
Expand All @@ -76,18 +103,7 @@ def __init__(
self._config = config
self._fill_queue_dict(queue_lst_dict=self._config["queues"])
self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
if self._config["queue_type"] in queue_type_dict.keys():
class_name = queue_type_dict[self._config["queue_type"]]["class_name"]
module_name = queue_type_dict[self._config["queue_type"]]["module_name"]
else:
raise ValueError(
"The queue_type "
+ self._config["queue_type"]
+ " is not found in the list of supported queue types "
+ str(list(queue_type_dict.keys()))
)
if self._config["queue_type"] != "REMOTE":
self._commands = getattr(importlib.import_module(module_name), class_name)()
self._commands = get_queue_commands(queue_type=self._config["queue_type"])
self._queues = Queues(self.queue_list)
self._remote_flag = False
self._ssh_delete_file_on_remote = True
Expand Down Expand Up @@ -166,7 +182,7 @@ def submit_job(
dependency_list: Optional[List[str]] = None,
command: Optional[str] = None,
**kwargs,
) -> int:
) -> Union[int, None]:
"""
Submit a job to the queue.
Expand Down Expand Up @@ -240,7 +256,7 @@ def enable_reservation(self, process_id: int):
else:
return None

def delete_job(self, process_id: int) -> str:
def delete_job(self, process_id: int) -> Union[str, None]:
"""
Delete a job.
Expand Down Expand Up @@ -287,7 +303,7 @@ def get_status_of_my_jobs(self) -> pandas.DataFrame:
"""
return self.get_queue_status(user=self._get_user())

def get_status_of_job(self, process_id: int) -> str:
def get_status_of_job(self, process_id: int) -> Union[str, None]:
"""
Get the status of a job.
Expand Down Expand Up @@ -363,12 +379,12 @@ def transfer_file(

def check_queue_parameters(
self,
queue: str,
queue: Optional[str],
cores: int = 1,
run_time_max: Optional[int] = None,
memory_max: Optional[int] = None,
active_queue: Optional[dict] = None,
) -> list:
) -> Tuple[Union[float, int, None], Union[float, int, None], Union[float, int, None]]:
"""
Check the parameters of a queue.
Expand Down

0 comments on commit 82da893

Please sign in to comment.