diff --git a/.github/workflows/unittest-mpich.yml b/.github/workflows/unittest-mpich.yml index 2d32edc5..ac568ccc 100644 --- a/.github/workflows/unittest-mpich.yml +++ b/.github/workflows/unittest-mpich.yml @@ -40,11 +40,6 @@ jobs: label: linux-64-py-3-9-mpich prefix: /usr/share/miniconda3/envs/my-env - - operating-system: ubuntu-latest - python-version: 3.8 - label: linux-64-py-3-8-mpich - prefix: /usr/share/miniconda3/envs/my-env - steps: - uses: actions/checkout@v2 - uses: conda-incubator/setup-miniconda@v2.2.0 diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index 9080055f..e36b7966 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -40,11 +40,6 @@ jobs: label: linux-64-py-3-9-openmpi prefix: /usr/share/miniconda3/envs/my-env - - operating-system: ubuntu-latest - python-version: 3.8 - label: linux-64-py-3-8-openmpi - prefix: /usr/share/miniconda3/envs/my-env - steps: - uses: actions/checkout@v2 - uses: conda-incubator/setup-miniconda@v2.2.0 diff --git a/pympipool/__init__.py b/pympipool/__init__.py index aa9bc881..cf8b25bb 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,5 +1,6 @@ import os import shutil +from typing import Optional from ._version import get_versions from pympipool.mpi.executor import PyMPIExecutor from pympipool.shared.interface import SLURM_COMMAND @@ -69,30 +70,30 @@ class Executor: def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, executor=None, - hostname_localhost=False, + hostname_localhost: bool = False, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass def __new__( cls, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, executor=None, - hostname_localhost=False, + hostname_localhost: bool = False, ): """ Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor, diff --git a/pympipool/backend/serial.py b/pympipool/backend/serial.py index effdec6b..e6398fdb 100644 --- a/pympipool/backend/serial.py +++ b/pympipool/backend/serial.py @@ -1,5 +1,6 @@ from os.path import abspath import sys +from typing import Optional from pympipool.shared.communication import ( interface_connect, @@ -10,7 +11,7 @@ from pympipool.shared.backend import call_funct, parse_arguments -def main(argument_lst=None): +def main(argument_lst: Optional[list[str]] = None): if argument_lst is None: argument_lst = sys.argv argument_dict = parse_arguments(argument_lst=argument_lst) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 232cd518..5856133a 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -1,4 +1,5 @@ import os +from typing import Optional import flux.job @@ -56,14 +57,14 @@ class PyFluxExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - executor=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, + executor: Optional[flux.job.FluxExecutor] = None, + hostname_localhost: Optional[bool] = False, ): super().__init__() self._set_process( @@ -92,12 +93,12 @@ def __init__( class FluxPythonInterface(BaseInterface): def __init__( self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - executor=None, + cwd: Optional[str] = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, + executor: Optional[flux.job.FluxExecutor] = None, ): super().__init__( cwd=cwd, @@ -109,7 +110,7 @@ def __init__( self._executor = executor self._future = None - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): if self._oversubscribe: raise ValueError( "Oversubscribing is currently not supported for the Flux adapter." @@ -129,7 +130,7 @@ def bootup(self, command_lst): jobspec.cwd = self._cwd self._future = self._executor.submit(jobspec) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): if self.poll(): self._future.cancel() # The flux future objects are not instantly updated, diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index b28ffd47..b1e39048 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -1,3 +1,5 @@ +from typing import Optional + from pympipool.shared.executorbase import ( execute_parallel_tasks, ExecutorBroker, @@ -51,12 +53,12 @@ class PyMPIExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - oversubscribe=False, - init_function=None, - cwd=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + oversubscribe: bool = False, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process( diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index 5c917df9..6f9f18ac 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -1,7 +1,10 @@ +from typing import Optional import inspect -def call_funct(input_dict, funct=None, memory=None): +def call_funct( + input_dict: dict, funct: Optional[callable] = None, memory: Optional[dict] = None +) -> callable: """ Call function from dictionary @@ -30,7 +33,7 @@ def funct(*args, **kwargs): return funct(input_dict["fn"], *input_dict["args"], **input_dict["kwargs"]) -def parse_arguments(argument_lst): +def parse_arguments(argument_lst: list[str]) -> dict: """ Simple function to parse command line arguments @@ -50,7 +53,9 @@ def parse_arguments(argument_lst): ) -def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): +def update_default_dict_from_arguments( + argument_lst: list[str], argument_dict: dict, default_dict: dict +) -> dict: default_dict.update( { k: argument_lst[argument_lst.index(v) + 1] @@ -61,7 +66,9 @@ def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict return default_dict -def _update_dict_delta(dict_input, dict_output, keys_possible_lst): +def _update_dict_delta( + dict_input: dict, dict_output: dict, keys_possible_lst: list +) -> dict: return { k: v for k, v in dict_input.items() diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index fad9450d..6d15a325 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -18,7 +18,7 @@ def __init__(self, interface=None): self._process = None self._interface = interface - def send_dict(self, input_dict): + def send_dict(self, input_dict: dict): """ Send a dictionary with instructions to a connected client process. @@ -42,7 +42,7 @@ def receive_dict(self): error_type = output["error_type"].split("'")[1] raise eval(error_type)(output["error"]) - def send_and_receive_dict(self, input_dict): + def send_and_receive_dict(self, input_dict: dict) -> dict: """ Combine both the send_dict() and receive_dict() function in a single call. @@ -66,7 +66,7 @@ def bind_to_random_port(self): """ return self._socket.bind_to_random_port("tcp://*") - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): """ Boot up the client process to connect to the SocketInterface. @@ -75,7 +75,7 @@ def bootup(self, command_lst): """ self._interface.bootup(command_lst=command_lst) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): result = None if self._interface.poll(): result = self.send_and_receive_dict( @@ -96,9 +96,9 @@ def __del__(self): def interface_bootup( - command_lst, + command_lst: list[str], connections, - hostname_localhost=False, + hostname_localhost: bool = False, ): """ Start interface for ZMQ communication @@ -132,7 +132,7 @@ def interface_bootup( return interface -def interface_connect(host, port): +def interface_connect(host: str, port: str): """ Connect to an existing SocketInterface instance by providing the hostname and the port as strings. @@ -146,7 +146,7 @@ def interface_connect(host, port): return context, socket -def interface_send(socket, result_dict): +def interface_send(socket: zmq.Socket, result_dict: dict): """ Send results to a SocketInterface instance. @@ -157,7 +157,7 @@ def interface_send(socket, result_dict): socket.send(cloudpickle.dumps(result_dict)) -def interface_receive(socket): +def interface_receive(socket: zmq.Socket): """ Receive instructions from a SocketInterface instance. @@ -167,7 +167,7 @@ def interface_receive(socket): return cloudpickle.loads(socket.recv()) -def interface_shutdown(socket, context): +def interface_shutdown(socket: zmq.Socket, context: zmq.Context): """ Close the connection to a SocketInterface instance. diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 9ed8bebd..1075bddd 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -1,3 +1,5 @@ +import threading +from typing import Optional from concurrent.futures import ( Executor as FutureExecutor, Future, @@ -38,7 +40,7 @@ def info(self): def future_queue(self): return self._future_queue - def submit(self, fn, *args, **kwargs): + def submit(self, fn: callable, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns @@ -51,7 +53,7 @@ def submit(self, fn, *args, **kwargs): self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -74,7 +76,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._process = None self._future_queue = None - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process self._process.start() @@ -87,13 +89,13 @@ def __del__(self): except (AttributeError, RuntimeError): pass - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process self._process.start() class ExecutorBroker(ExecutorBase): - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -119,13 +121,13 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._process = None self._future_queue = None - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process for process in self._process: process.start() -def cancel_items_in_queue(que): +def cancel_items_in_queue(que: queue.Queue): """ Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future objects have to be cancelled when the executor shuts down. @@ -143,7 +145,7 @@ def cancel_items_in_queue(que): break -def cloudpickle_register(ind=2): +def cloudpickle_register(ind: int = 2): """ Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to be pickled by value rather than by reference, so the module which calls the map function is pickled by value. @@ -166,11 +168,11 @@ def cloudpickle_register(ind=2): def execute_parallel_tasks( - future_queue, - cores, + future_queue: queue.Queue, + cores: int, interface_class, - hostname_localhost=False, - init_function=None, + hostname_localhost: bool = False, + init_function: Optional[callable] = None, **kwargs, ): """ @@ -199,7 +201,9 @@ def execute_parallel_tasks( ) -def execute_parallel_tasks_loop(interface, future_queue, init_function=None): +def execute_parallel_tasks_loop( + interface, future_queue: queue.Queue, init_function: Optional[callable] = None +): if init_function is not None: interface.send_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} @@ -225,7 +229,7 @@ def execute_parallel_tasks_loop(interface, future_queue, init_function=None): future_queue.task_done() -def _get_backend_path(cores): +def _get_backend_path(cores: int): command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] @@ -234,5 +238,5 @@ def _get_backend_path(cores): return command_lst -def _get_command_path(executable): +def _get_command_path(executable: str): return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index 37be5b82..8fcae945 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -1,5 +1,6 @@ from abc import ABC import subprocess +from typing import Optional MPI_COMMAND = "mpiexec" @@ -7,15 +8,15 @@ class BaseInterface(ABC): - def __init__(self, cwd, cores=1, oversubscribe=False): + def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False): self._cwd = cwd self._cores = cores self._oversubscribe = oversubscribe - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): raise NotImplementedError - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): raise NotImplementedError def poll(self): @@ -25,9 +26,9 @@ def poll(self): class SubprocessInterface(BaseInterface): def __init__( self, - cwd=None, - cores=1, - oversubscribe=False, + cwd: Optional[str] = None, + cores: int = 1, + oversubscribe: bool = False, ): super().__init__( cwd=cwd, @@ -36,17 +37,17 @@ def __init__( ) self._process = None - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): self._process = subprocess.Popen( args=self.generate_command(command_lst=command_lst), cwd=self._cwd, stdin=subprocess.DEVNULL, ) - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]) -> list[str]: return command_lst - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): self._process.communicate() self._process.terminate() if wait: @@ -58,7 +59,7 @@ def poll(self): class MpiExecInterface(SubprocessInterface): - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]): command_prepend_lst = generate_mpiexec_command( cores=self._cores, oversubscribe=self._oversubscribe, @@ -71,12 +72,12 @@ def generate_command(self, command_lst): class SrunInterface(SubprocessInterface): def __init__( self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - command_line_argument_lst=[], + cwd: Optional[str] = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, + command_line_argument_lst: list[str] = [], ): super().__init__( cwd=cwd, @@ -87,7 +88,7 @@ def __init__( self._gpus_per_core = gpus_per_core self._command_line_argument_lst = command_line_argument_lst - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]) -> list[str]: command_prepend_lst = generate_slurm_command( cores=self._cores, cwd=self._cwd, @@ -101,7 +102,7 @@ def generate_command(self, command_lst): ) -def generate_mpiexec_command(cores, oversubscribe=False): +def generate_mpiexec_command(cores: int, oversubscribe: bool = False) -> list[str]: if cores == 1: return [] else: @@ -112,13 +113,13 @@ def generate_mpiexec_command(cores, oversubscribe=False): def generate_slurm_command( - cores, - cwd, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - command_line_argument_lst=[], -): + cores: int, + cwd: str, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, + command_line_argument_lst: list[str] = [], +) -> list[str]: command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index aa1c8f95..92e4803a 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -1,3 +1,4 @@ +import queue from concurrent.futures import Future import subprocess @@ -5,7 +6,7 @@ from pympipool.shared.thread import RaisingThread -def execute_single_task(future_queue): +def execute_single_task(future_queue: queue.Queue): """ Process items received via the queue. @@ -59,7 +60,7 @@ class SubprocessExecutor(ExecutorBroker): def __init__( self, - max_workers=1, + max_workers: int = 1, ): super().__init__() self._set_process( diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index adef58b3..1a3dc6c4 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -1,3 +1,6 @@ +import queue +import threading +from typing import Optional from concurrent.futures import Future import subprocess from time import sleep @@ -6,7 +9,7 @@ from pympipool.shared.thread import RaisingThread -def wait_for_process_to_stop(process, sleep_interval=10e-10): +def wait_for_process_to_stop(process: threading.Thread, sleep_interval: float = 10e-10): """ Wait for the subprocess.Popen() process to stop executing @@ -18,7 +21,7 @@ def wait_for_process_to_stop(process, sleep_interval=10e-10): sleep(sleep_interval) -def execute_single_task(future_queue): +def execute_single_task(future_queue: queue.Queue): """ Process items received via the queue. @@ -117,7 +120,12 @@ def __init__(self, *args, **kwargs): ) self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) - def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): + def submit( + self, + string_input: str, + lines_to_read: Optional[int] = None, + stop_read_pattern: Optional[str] = None, + ): """ Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure to identify the completion of the execution. This can either be provided based on the number of lines to read @@ -149,7 +157,7 @@ def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): ) return f - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 0f4351d6..a587b73e 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -1,3 +1,4 @@ +from typing import Optional from pympipool.shared.executorbase import ( execute_parallel_tasks, ExecutorBroker, @@ -52,15 +53,15 @@ class PySlurmExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, - hostname_localhost=False, - command_line_argument_lst=[], + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, + hostname_localhost: bool = False, + command_line_argument_lst: list[str] = [], ): super().__init__() self._set_process(